flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [11/53] [abbrv] git commit: Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Thu, 26 Jun 2014 09:46:36 GMT
Rework the Taskmanager to a slot based model and remove legacy cloud code

Squashed commit of the following:

  - Post merge cleanup
  - Renamed fractionMemory into memoryFraction.
  - Removed Local and QueueScheduler and replaced it instead with an unified DefaultScheduler.
  - Removed Local and ClusterManager and inserted instead an unified DefaultInstanceManager.
  - Removed connection IDs from execution edges
  - Removed InstanceType, InstanceRequestMap, InstanceTypeDescription, InstanceTypeDescriptionTypeFactory, PendingRequestsMap
  - Fixed problems with test cases.
  - introduced simple slot system for scheduling.
  - Removed subtasks per instance
  - Added registerTaskManager to the JobManager RPC calls. RegisterTaskManager is called only once where the hardware description information is sent.

Add: Merging cloudmodel remove with new network stack


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/86d206c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/86d206c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/86d206c4

Branch: refs/heads/travis_test
Commit: 86d206c41922a1b7b8c2839b65d3568f9be55e0c
Parents: 7b6b5a2
Author: Till Rohrmann <till.rohrmann@gmail.com>
Authored: Sun Jun 1 16:03:27 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jun 22 21:07:10 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   1 +
 .../eu/stratosphere/client/LocalExecutor.java   |   9 +
 .../client/minicluster/NepheleMiniCluster.java  |  23 +-
 .../eu/stratosphere/client/program/Client.java  |   5 +-
 .../client/CliFrontendListCancelTest.java       |  11 +-
 .../stratosphere/client/testjar/WordCount.java  |   3 +-
 .../eu/stratosphere/compiler/PactCompiler.java  | 473 +---------
 .../compiler/costs/DefaultCostEstimator.java    |  18 +-
 .../dag/AbstractPartialSolutionNode.java        |   5 -
 .../compiler/dag/BinaryUnionNode.java           |  48 +-
 .../compiler/dag/BulkIterationNode.java         |  20 +-
 .../stratosphere/compiler/dag/DataSinkNode.java |  18 +-
 .../compiler/dag/DataSourceNode.java            |  16 -
 .../compiler/dag/GroupReduceNode.java           |   1 -
 .../compiler/dag/OptimizerNode.java             |  51 +-
 .../stratosphere/compiler/dag/ReduceNode.java   |   1 -
 .../compiler/dag/SingleInputNode.java           |  41 +-
 .../stratosphere/compiler/dag/SinkJoiner.java   |   1 -
 .../stratosphere/compiler/dag/TwoInputNode.java |  56 +-
 .../compiler/dag/WorksetIterationNode.java      |  12 +-
 .../RequestedGlobalProperties.java              |   8 +-
 .../AllGroupWithPartialPreGroupProperties.java  |   3 +-
 .../compiler/operators/AllReduceProperties.java |   3 +-
 .../GroupReduceWithCombineProperties.java       |   6 +-
 .../operators/PartialGroupProperties.java       |   6 +-
 .../compiler/operators/ReduceProperties.java    |   3 +-
 .../eu/stratosphere/compiler/plan/Channel.java  |  59 +-
 .../eu/stratosphere/compiler/plan/PlanNode.java |  25 +-
 .../plandump/PlanJSONDumpGenerator.java         |   3 -
 .../plantranslate/NepheleJobGraphGenerator.java |  80 +-
 .../pact/compiler/CompilerTestBase.java         |  24 +-
 .../configuration/ConfigConstants.java          |  12 +-
 .../java/eu/stratosphere/util/ClassUtils.java   |   1 +
 .../event/job/VertexAssignmentEvent.java        |  32 +-
 .../nephele/executiongraph/ExecutionEdge.java   |   9 -
 .../nephele/executiongraph/ExecutionGraph.java  | 181 ++--
 .../executiongraph/ExecutionGroupVertex.java    | 184 +---
 .../nephele/executiongraph/ExecutionStage.java  | 112 +--
 .../nephele/executiongraph/ExecutionVertex.java |   1 -
 .../executiongraph/InternalJobStatus.java       |   1 +
 .../executiongraph/ManagementGraphFactory.java  |   7 +-
 .../nephele/instance/AbstractInstance.java      | 297 ------
 .../nephele/instance/AllocatedResource.java     |  38 +-
 .../nephele/instance/AllocatedSlot.java         |  65 ++
 .../nephele/instance/AllocationID.java          |   4 +-
 .../instance/DefaultInstanceManager.java        | 393 ++++++++
 .../nephele/instance/DummyInstance.java         |  14 +-
 .../stratosphere/nephele/instance/Hardware.java |  24 +
 .../stratosphere/nephele/instance/Instance.java | 362 +++++++
 .../nephele/instance/InstanceManager.java       | 145 +--
 .../nephele/instance/InstanceNotifier.java      |  71 ++
 .../nephele/instance/InstanceRequestMap.java    | 184 ----
 .../nephele/instance/InstanceType.java          | 199 ----
 .../instance/InstanceTypeDescription.java       | 137 ---
 .../InstanceTypeDescriptionFactory.java         |  46 -
 .../nephele/instance/InstanceTypeFactory.java   |  91 --
 .../nephele/instance/LocalInstanceManager.java  |  60 ++
 .../instance/cluster/AllocatedSlice.java        | 120 ---
 .../instance/cluster/ClusterInstance.java       | 181 ----
 .../cluster/ClusterInstanceNotifier.java        |  71 --
 .../instance/cluster/ClusterManager.java        | 945 -------------------
 .../instance/cluster/PendingRequestsMap.java    |  97 --
 .../nephele/instance/local/LocalInstance.java   |  37 -
 .../instance/local/LocalInstanceManager.java    | 418 --------
 .../instance/local/LocalInstanceNotifier.java   |  70 --
 .../nephele/jobgraph/AbstractJobVertex.java     | 100 +-
 .../nephele/jobmanager/DeploymentManager.java   |   8 +-
 .../nephele/jobmanager/EventCollector.java      |  10 +-
 .../nephele/jobmanager/JobManager.java          |  98 +-
 .../nephele/jobmanager/JobManagerUtils.java     |  54 +-
 .../scheduler/AbstractExecutionListener.java    | 166 ----
 .../jobmanager/scheduler/AbstractScheduler.java | 662 -------------
 .../scheduler/DefaultExecutionListener.java     | 127 +++
 .../jobmanager/scheduler/DefaultScheduler.java  | 762 +++++++++++++++
 .../jobmanager/scheduler/RecoveryLogic.java     | 248 -----
 .../scheduler/local/LocalExecutionListener.java |  33 -
 .../scheduler/local/LocalScheduler.java         | 213 -----
 .../scheduler/queue/QueueExecutionListener.java |  40 -
 .../scheduler/queue/QueueScheduler.java         | 216 -----
 .../splitassigner/InputSplitManager.java        |   2 +-
 .../LocatableInputSplitAssigner.java            |   4 +-
 .../splitassigner/LocatableInputSplitList.java  |  20 +-
 .../file/FileInputSplitAssigner.java            |   4 +-
 .../splitassigner/file/FileInputSplitList.java  |  20 +-
 .../managementgraph/ManagementGraph.java        |   4 +-
 .../managementgraph/ManagementVertex.java       |  35 +-
 .../eu/stratosphere/nephele/net/NetUtils.java   |   2 +
 .../profiling/impl/JobProfilingData.java        |   6 +-
 .../protocols/ExtendedManagementProtocol.java   |  23 +-
 .../nephele/protocols/JobManagerProtocol.java   |  19 +-
 .../services/iomanager/ChannelAccess.java       |   1 +
 .../services/memorymanager/MemoryManager.java   |  17 +-
 .../memorymanager/spi/DefaultMemoryManager.java |  39 +-
 .../nephele/taskmanager/TaskManager.java        | 123 ++-
 .../RegisterTaskManagerResult.java              |  50 +
 .../nephele/topology/NetworkNode.java           |  10 -
 .../eu/stratosphere/nephele/util/IOUtils.java   |   1 +
 .../pact/runtime/cache/FileCache.java           |   9 +-
 .../hash/BuildFirstHashMatchIterator.java       |   8 +-
 .../BuildFirstReOpenableHashMatchIterator.java  |   8 +-
 .../hash/BuildSecondHashMatchIterator.java      |   8 +-
 .../pact/runtime/hash/InMemoryPartition.java    |   2 +
 .../iterative/task/IterationHeadPactTask.java   |   5 +-
 .../pact/runtime/shipping/ShipStrategyType.java |  23 +-
 .../runtime/sort/AsynchronousPartialSorter.java |  11 +-
 .../AsynchronousPartialSorterCollector.java     |   7 +-
 .../sort/CombiningUnilateralSortMerger.java     |  18 +-
 .../pact/runtime/sort/UnilateralSortMerger.java |  18 +-
 .../AbstractCachedBuildSideMatchDriver.java     |   2 +-
 .../pact/runtime/task/CrossDriver.java          |   3 +-
 .../pact/runtime/task/DataSinkTask.java         |   2 +-
 .../runtime/task/GroupReduceCombineDriver.java  |   4 +-
 .../pact/runtime/task/MatchDriver.java          |  38 +-
 .../pact/runtime/task/ReduceCombineDriver.java  |   3 +-
 .../pact/runtime/task/RegularPactTask.java      |  12 +-
 .../SynchronousChainedCombineDriver.java        |   2 +-
 .../pact/runtime/task/util/TaskConfig.java      |  68 +-
 .../runtime/io/channels/InputChannel.java       |   9 +-
 .../runtime/io/gates/InputGate.java             |   2 +
 .../runtime/io/network/RemoteReceiver.java      |  20 +-
 .../nephele/event/job/ManagementEventTest.java  |   4 +-
 .../executiongraph/ExecutionGraphTest.java      | 258 +----
 .../instance/cluster/ClusterManagerTest.java    | 273 ------
 .../cluster/ClusterManagerTestUtils.java        |  66 --
 .../cluster/DefaultInstanceManagerTest.java     | 232 +++++
 .../DefaultInstanceManagerTestUtils.java        |  66 ++
 .../instance/cluster/HostInClusterTest.java     | 130 ++-
 .../cluster/PendingRequestsMapTest.java         |  91 --
 .../local/LocalInstanceManagerTest.java         |  17 +-
 .../nephele/jobmanager/JobManagerITCase.java    |  16 +-
 .../scheduler/queue/DefaultSchedulerTest.java   | 185 ++++
 .../scheduler/queue/QueueSchedulerTest.java     | 186 ----
 .../scheduler/queue/TestDeploymentManager.java  |   4 +-
 .../scheduler/queue/TestInstanceManager.java    | 118 +--
 .../managementgraph/ManagementGraphTest.java    |  11 +-
 .../services/iomanager/IOManagerITCase.java     |   2 +-
 .../IOManagerPerformanceBenchmark.java          |   2 +-
 .../services/iomanager/IOManagerTest.java       |   2 +-
 .../memorymanager/MemorySegmentTest.java        |   2 +-
 .../nephele/util/ServerTestUtils.java           |  17 +-
 .../runtime/hash/HashMatchIteratorITCase.java   |  14 +-
 .../pact/runtime/hash/HashTableITCase.java      |   2 +-
 .../runtime/hash/ReOpenableHashTableITCase.java |   4 +-
 .../pact/runtime/io/ChannelViewsTest.java       |   8 +-
 .../pact/runtime/io/SpillingBufferTest.java     |   2 +-
 .../event/EventWithAggregatorsTest.java         |   2 +
 .../resettable/BlockResettableIteratorTest.java |   2 +-
 ...lockResettableMutableObjectIteratorTest.java |   2 +-
 .../sort/AsynchonousPartialSorterITCase.java    |  14 +-
 .../CombiningUnilateralSortMergerITCase.java    |   8 +-
 .../pact/runtime/sort/ExternalSortITCase.java   |  12 +-
 .../sort/MassiveStringSortingITCase.java        |   4 +-
 .../sort/SortMergeMatchIteratorITCase.java      |   2 +-
 .../runtime/task/CombineTaskExternalITCase.java |   8 +-
 .../pact/runtime/task/CombineTaskTest.java      |  10 +-
 .../runtime/task/CrossTaskExternalITCase.java   |   7 +-
 .../pact/runtime/task/CrossTaskTest.java        |  36 +-
 .../pact/runtime/task/DataSinkTaskTest.java     |  47 +-
 .../runtime/task/MatchTaskExternalITCase.java   |  14 +-
 .../pact/runtime/task/MatchTaskTest.java        |  56 +-
 .../runtime/task/ReduceTaskExternalITCase.java  |   8 +-
 .../pact/runtime/task/ReduceTaskTest.java       |   3 +-
 .../runtime/task/chaining/ChainTaskTest.java    |  19 +-
 .../task/drivers/ReduceCombineDriverTest.java   |  10 +-
 .../runtime/task/drivers/TestTaskContext.java   |   2 +-
 .../pact/runtime/test/util/DriverTestBase.java  |   8 +-
 .../pact/runtime/test/util/MockEnvironment.java |   9 +-
 .../netty/InboundEnvelopeDecoderTest.java       |   2 +-
 .../test/compiler/util/CompilerTestBase.java    |  26 +-
 .../test/util/AbstractTestBase.java             |  48 +-
 .../test/util/JavaProgramTestBase.java          |   2 +
 .../test/util/RecordAPITestBase.java            |   3 +
 .../test/accumulators/AccumulatorITCase.java    |   7 +-
 .../BroadcastVarsNepheleITCase.java             |  16 +-
 .../KMeansIterativeNepheleITCase.java           |  30 +-
 .../test/cancelling/CancellingTestBase.java     |  10 +-
 .../test/cancelling/MapCancelingITCase.java     |  13 +-
 .../cancelling/MatchJoinCancelingITCase.java    |  17 +-
 .../clients/examples/LocalExecutorITCase.java   |  10 +-
 .../exampleJavaPrograms/WordCountITCase.java    |   4 +-
 .../ComputeEdgeDegreesITCase.java               |   2 +-
 .../ConnectedComponentsITCase.java              |   2 +-
 .../EnumTrianglesOnEdgesWithDegreesITCase.java  |   2 +-
 .../TransitiveClosureNaiveITCase.java           |   2 +-
 .../WebLogAnalysisITCase.java                   |   2 +-
 .../exampleScalaPrograms/WordCountITCase.java   |   2 +-
 .../WordCountPactValueITCase.java               |   2 +-
 .../WordCountWithCountFunctionITCase.java       |   2 +-
 .../test/failingPrograms/TaskFailureITCase.java |   8 +-
 .../CoGroupConnectedComponentsITCase.java       |   6 +-
 .../iterative/ConnectedComponentsITCase.java    |   6 +-
 ...ectedComponentsWithDeferredUpdateITCase.java |   3 +-
 ...tedComponentsWithSolutionSetFirstITCase.java |   7 +-
 .../test/iterative/DanglingPageRankITCase.java  |   3 +-
 .../test/iterative/DeltaPageRankITCase.java     |   3 +-
 .../DependencyConnectedComponentsITCase.java    |   5 +-
 ...IterationTerminationWithTerminationTail.java |   6 +-
 .../IterationTerminationWithTwoTails.java       |   6 +-
 .../IterationWithAllReducerITCase.java          |   6 +-
 .../iterative/IterationWithChainingITCase.java  |   3 +-
 .../iterative/IterationWithUnionITCase.java     |   3 +-
 .../test/iterative/IterativeKMeansITCase.java   |   6 +-
 .../test/iterative/KMeansITCase.java            |   8 +-
 .../test/iterative/LineRankITCase.java          |   5 +-
 .../test/iterative/PageRankITCase.java          |   3 +-
 .../ConnectedComponentsNepheleITCase.java       |  54 +-
 .../nephele/DanglingPageRankNepheleITCase.java  |   7 +-
 ...nglingPageRankWithCombinerNepheleITCase.java |   7 +-
 .../IterationWithChainingNepheleITCase.java     |  17 +-
 .../test/iterative/nephele/JobGraphUtils.java   |  20 +-
 .../CustomCompensatableDanglingPageRank.java    |  57 +-
 ...mpensatableDanglingPageRankWithCombiner.java |  59 +-
 .../CompensatableDanglingPageRank.java          |  55 +-
 .../PackagedProgramEndToEndITCase.java          |  15 +-
 .../test/operators/UnionSinkITCase.java         |   3 +-
 .../recordJobTests/CollectionSourceTest.java    |   8 +-
 .../ComputeEdgeDegreesITCase.java               |   3 +-
 .../EnumTrianglesOnEdgesWithDegreesITCase.java  |   3 +-
 .../recordJobTests/EnumTrianglesRDFITCase.java  |   4 +-
 .../recordJobTests/GlobalSortingITCase.java     |   5 +-
 .../GlobalSortingMixedOrderITCase.java          |  62 +-
 .../recordJobTests/GroupOrderReduceITCase.java  |   3 +-
 .../recordJobTests/MergeOnlyJoinITCase.java     |   1 +
 .../test/recordJobTests/PairwiseSPITCase.java   |   4 +-
 .../test/recordJobTests/TPCHQuery10ITCase.java  |   2 +-
 .../test/recordJobTests/TPCHQuery3ITCase.java   |   3 +-
 .../TPCHQuery3WithUnionITCase.java              |   6 +-
 .../test/recordJobTests/TPCHQuery4ITCase.java   |   6 +-
 .../test/recordJobTests/TPCHQuery9ITCase.java   |   6 +-
 .../recordJobTests/TPCHQueryAsterixITCase.java  |   6 +-
 .../test/recordJobTests/TeraSortITCase.java     |   7 +-
 .../recordJobTests/WebLogAnalysisITCase.java    |   6 +-
 .../test/recordJobTests/WordCountITCase.java    |   6 +-
 .../WordCountUnionReduceITCase.java             |   6 +-
 .../test/runtime/NetworkStackThroughput.java    |  49 +-
 235 files changed, 3917 insertions(+), 7900 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
index a766fcb..e398acf 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
@@ -47,6 +47,7 @@ public class AvroExternalJarProgramITCase {
 		try {
 			testMiniCluster = new NepheleMiniCluster();
 			testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
+			testMiniCluster.setTaskManagerNumSlots(4);
 			testMiniCluster.start();
 			
 			String jarFile = JAR_FILE;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
index 89f996a..b017220 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
@@ -42,6 +42,8 @@ public class LocalExecutor extends PlanExecutor {
 	
 	private static boolean DEFAULT_OVERWRITE = false;
 
+	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+
 	private final Object lock = new Object();	// we lock to ensure singleton execution
 	
 	private NepheleMiniCluster nephele;
@@ -54,6 +56,8 @@ public class LocalExecutor extends PlanExecutor {
 	
 	private int taskManagerDataPort = -1;
 
+	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+
 	private String configDir;
 
 	private String hdfsConfigFile;
@@ -129,6 +133,10 @@ public class LocalExecutor extends PlanExecutor {
 	public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
 		this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
 	}
+
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+	public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -157,6 +165,7 @@ public class LocalExecutor extends PlanExecutor {
 				}
 				nephele.setDefaultOverwriteFiles(defaultOverwriteFiles);
 				nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory);
+				nephele.setTaskManagerNumSlots(taskManagerNumSlots);
 				
 				// start it up
 				this.nephele.start();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
index 79e5c64..4daca26 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
@@ -46,6 +46,8 @@ public class NepheleMiniCluster {
 
 	private static final boolean DEFAULT_LAZY_MEMORY_ALLOCATION = true;
 
+	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
+
 	// --------------------------------------------------------------------------------------------
 	
 	private final Object startStopLock = new Object();
@@ -56,7 +58,9 @@ public class NepheleMiniCluster {
 	
 	private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
 
-	private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
+	private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER;
+
+	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
 	private long memorySize = DEFAULT_MEMORY_SIZE;
 	
@@ -149,9 +153,13 @@ public class NepheleMiniCluster {
 		this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
 	}
 
-	public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
+	public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
+
+	public int getNumTaskTracker() { return numTaskTracker; }
 
-	public int getNumTaskManager() { return numTaskManager; }
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+	public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
 
 	// ------------------------------------------------------------------------
 	// Life cycle and Job Submission
@@ -172,7 +180,7 @@ public class NepheleMiniCluster {
 			} else {
 				Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
 					taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
-						defaultAlwaysCreateDirectory, numTaskManager);
+						defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker);
 				GlobalConfiguration.includeConfiguration(conf);
 			}
 
@@ -196,7 +204,7 @@ public class NepheleMiniCluster {
 			// start the job manager
 			jobManager = new JobManager(ExecutionMode.LOCAL);
 	
-			waitForJobManagerToBecomeReady(numTaskManager);
+			waitForJobManagerToBecomeReady(numTaskTracker);
 		}
 	}
 
@@ -236,7 +244,8 @@ public class NepheleMiniCluster {
 	
 	public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
 			int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
-			boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, int numTaskManager)
+			boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory,
+			int taskManagerNumSlots, int numTaskManager)
 	{
 		final Configuration config = new Configuration();
 		
@@ -284,6 +293,8 @@ public class NepheleMiniCluster {
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize/numTaskManager);
 
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
+
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
 		
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 00790f4..31138f6 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -77,7 +77,7 @@ public class Client {
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
 		
-		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
+		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
 		
 		//  Disable Local Execution when using a Client
 		ContextEnvironment.disableLocalExecution();
@@ -104,8 +104,7 @@ public class Client {
 			throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
 		}
 
-		final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port);
-		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
+		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
 		
 		//  Disable Local Execution when using a Client
 		ContextEnvironment.disableLocalExecution();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
index 7ccd420..ba02fa9 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
 import org.junit.Assert;
@@ -34,8 +33,6 @@ import eu.stratosphere.nephele.client.JobProgressResult;
 import eu.stratosphere.nephele.client.JobSubmissionResult;
 import eu.stratosphere.nephele.event.job.AbstractEvent;
 import eu.stratosphere.nephele.event.job.RecentJobEvent;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
@@ -202,18 +199,18 @@ public class CliFrontendListCancelTest {
 		}
 
 		@Override
-		public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() throws IOException {
+		public void logBufferUtilization(JobID jobID) throws IOException {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public void logBufferUtilization(JobID jobID) throws IOException {
+		public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
-			throw new UnsupportedOperationException();
+		public int getAvailableSlots() {
+			return 1;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
index e827805..5218dc2 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
@@ -70,9 +70,10 @@ public class WordCount {
 	 * FlatMapFunction. The function takes a line (String) and splits it into 
 	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
 	 */
-	@SuppressWarnings("serial")
 	public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
 
+		private static final long serialVersionUID = 1L;
+
 		@Override
 		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 2076902..bf3d6af 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.compiler;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -90,11 +88,6 @@ import eu.stratosphere.compiler.postpass.OptimizerPostPass;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.ipc.RPC;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.InstantiationUtil;
@@ -340,24 +333,10 @@ public class PactCompiler {
 	private final CostEstimator costEstimator;
 
 	/**
-	 * The connection used to connect to the job-manager.
-	 */
-	private final InetSocketAddress jobManagerAddress;
-
-	/**
-	 * The maximum number of machines (instances) to use, per the configuration.
-	 */
-	private int maxMachines;
-
-	/**
 	 * The default degree of parallelism for jobs compiled by this compiler.
 	 */
 	private int defaultDegreeOfParallelism;
 
-	/**
-	 * The maximum number of subtasks that should share an instance.
-	 */
-	private int maxIntraNodeParallelism;
 
 	// ------------------------------------------------------------------------
 	// Constructor & Setup
@@ -420,106 +399,29 @@ public class PactCompiler {
 	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
 	 */
 	public PactCompiler(DataStatistics stats, CostEstimator estimator) {
-		this(stats, estimator, null);
-	}
-
-	/**
-	 * Creates a new compiler instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the compiler can make better choices for the execution strategies.
-	 * as if no filesystem was given. It uses the given cost estimator to compute the costs of the individual
-	 * operations.
-	 * <p>
-	 * The given socket-address is used to connect to the job manager to obtain system characteristics, like available
-	 * memory. If that parameter is null, then the address is obtained from the global configuration.
-	 * 
-	 * @param stats
-	 *        The statistics to be used to determine the input properties.
-	 * @param estimator
-	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
-	 * @param jobManagerConnection
-	 *        The address of the job manager that is queried for system characteristics.
-	 */
-	public PactCompiler(DataStatistics stats, CostEstimator estimator, InetSocketAddress jobManagerConnection) {
 		this.statistics = stats;
 		this.costEstimator = estimator;
 
 		Configuration config = GlobalConfiguration.getConfiguration();
 
-		// determine the maximum number of instances to use
-		this.maxMachines = -1;
-
 		// determine the default parallelization degree
 		this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
 			ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-
-		// determine the default intra-node parallelism
-		int maxInNodePar = config.getInteger(ConfigConstants.PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY,
-			ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE);
-		if (maxInNodePar == 0 || maxInNodePar < -1) {
-			LOG.error("Invalid maximum degree of intra-node parallelism: " + maxInNodePar +
-				". Ignoring parameter.");
-			maxInNodePar = ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE;
-		}
-		this.maxIntraNodeParallelism = maxInNodePar;
-
-		// assign the connection to the job-manager
-		if (jobManagerConnection != null) {
-			this.jobManagerAddress = jobManagerConnection;
-		} else {
-			final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-			if (address == null) {
-				throw new CompilerException(
-					"Cannot find address to job manager's RPC service in the global configuration.");
-			}
-
-			final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-			if (port < 0) {
-				throw new CompilerException(
-					"Cannot find port to job manager's RPC service in the global configuration.");
-			}
-
-			this.jobManagerAddress = new InetSocketAddress(address, port);
-		}
 	}
 	
 	// ------------------------------------------------------------------------
 	//                             Getters / Setters
 	// ------------------------------------------------------------------------
 	
-	public int getMaxMachines() {
-		return maxMachines;
-	}
-	
-	public void setMaxMachines(int maxMachines) {
-		if (maxMachines == -1 || maxMachines > 0) {
-			this.maxMachines = maxMachines;
-		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-	
 	public int getDefaultDegreeOfParallelism() {
 		return defaultDegreeOfParallelism;
 	}
 	
 	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
-		if (defaultDegreeOfParallelism == -1 || defaultDegreeOfParallelism > 0) {
+		if (defaultDegreeOfParallelism > 0) {
 			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
 		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-	
-	public int getMaxIntraNodeParallelism() {
-		return maxIntraNodeParallelism;
-	}
-	
-	public void setMaxIntraNodeParallelism(int maxIntraNodeParallelism) {
-		if (maxIntraNodeParallelism == -1 || maxIntraNodeParallelism > 0) {
-			this.maxIntraNodeParallelism = maxIntraNodeParallelism;
-		} else {
-			throw new IllegalArgumentException();
+			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
 		}
 	}
 	
@@ -550,14 +452,9 @@ public class PactCompiler {
 		// -------------------- try to get the connection to the job manager ----------------------
 		// --------------------------to obtain instance information --------------------------------
 		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
-		return compile(program, getInstanceTypeInfo(), postPasser);
-	}
-	
-	public OptimizedPlan compile(Plan program, InstanceTypeDescription type) throws CompilerException {
-		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
-		return compile(program, type, postPasser);
+		return compile(program, postPasser);
 	}
-	
+
 	/**
 	 * Translates the given pact plan in to an OptimizedPlan, where all nodes have their local strategy assigned
 	 * and all channels have a shipping strategy assigned. The process goes through several phases:
@@ -569,8 +466,6 @@ public class PactCompiler {
 	 * </ol>
 	 * 
 	 * @param program The program to be translated.
-	 * @param type The instance type to schedule the execution on. Used also to determine the amount of memory
-	 *             available to the tasks.
 	 * @param postPasser The function to be used for post passing the optimizer's plan and setting the
 	 *                   data type specific serialization routines.
 	 * @return The optimized plan.
@@ -579,8 +474,8 @@ public class PactCompiler {
 	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
 	 *         situation during the compilation process.
 	 */
-	private OptimizedPlan compile(Plan program, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException {
-		if (program == null || type == null || postPasser == null) {
+	private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
+		if (program == null || postPasser == null) {
 			throw new NullPointerException();
 		}
 		
@@ -588,73 +483,14 @@ public class PactCompiler {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
 		}
-		
-		final String instanceName = type.getInstanceType().getIdentifier();
-		
-		// we subtract some percentage of the memory to accommodate for rounding errors
-		final long memoryPerInstance = (long) (type.getHardwareDescription().getSizeOfFreeMemory() * 0.96f);
-		final int numInstances = type.getMaximumNumberOfAvailableInstances();
-		
-		// determine the maximum number of machines to use
-		int maxMachinesJob = program.getMaxNumberMachines();
-
-		if (maxMachinesJob < 1) {
-			maxMachinesJob = this.maxMachines;
-		} else if (this.maxMachines >= 1) {
-			// check if the program requested more than the global config allowed
-			if (maxMachinesJob > this.maxMachines && LOG.isWarnEnabled()) {
-				LOG.warn("Maximal number of machines specified in program (" + maxMachinesJob
-					+ ") exceeds the maximum number in the global configuration (" + this.maxMachines
-					+ "). Using the global configuration value.");
-			}
-
-			maxMachinesJob = Math.min(maxMachinesJob, this.maxMachines);
-		}
-
-		// adjust the maximum number of machines the the number of available instances
-		if (maxMachinesJob < 1) {
-			maxMachinesJob = numInstances;
-		} else if (maxMachinesJob > numInstances) {
-			maxMachinesJob = numInstances;
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Maximal number of machines decreased to " + maxMachinesJob +
-					" because no more instances are available.");
-			}
-		}
 
 		// set the default degree of parallelism
 		int defaultParallelism = program.getDefaultParallelism() > 0 ?
 			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
-		
-		if (this.maxIntraNodeParallelism > 0) {
-			if (defaultParallelism < 1) {
-				defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism;
-			}
-			else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
-				int oldParallelism = defaultParallelism;
-				defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism;
-
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Decreasing default degree of parallelism from " + oldParallelism +
-						" to " + defaultParallelism + " to fit a maximum number of " + maxMachinesJob +
-						" instances with a intra-parallelism of " + this.maxIntraNodeParallelism);
-				}
-			}
-		} else if (defaultParallelism < 1) {
-			defaultParallelism = maxMachinesJob;
-			if (LOG.isInfoEnabled()) {
-				LOG.info("No default parallelism specified. Using default parallelism of " + defaultParallelism + " (One task per instance)");
-			}
-		}
 
 		// log the output
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Using a default degree of parallelism of " + defaultParallelism +
-				", a maximum intra-node parallelism of " + this.maxIntraNodeParallelism + '.');
-			if (this.maxMachines > 0) {
-				LOG.debug("The execution is limited to a maximum number of " + maxMachinesJob + " machines.");
-			}
-
+			LOG.debug("Using a default degree of parallelism of " + defaultParallelism + '.');
 		}
 
 		// the first step in the compilation is to create the optimizer plan representation
@@ -666,7 +502,7 @@ public class PactCompiler {
 		// 4) It makes estimates about the data volume of the data sources and
 		// propagates those estimates through the plan
 
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(maxMachinesJob, defaultParallelism);
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism);
 		program.accept(graphCreator);
 
 		// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
@@ -689,8 +525,7 @@ public class PactCompiler {
 		// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
 		// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
 		
-		rootNode.accept(new IdAndMemoryAndEstimatesVisitor(this.statistics,
-			graphCreator.getMemoryConsumerCount() == 0 ? 0 : memoryPerInstance / graphCreator.getMemoryConsumerCount()));
+		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
 		
 		// Now that the previous step is done, the next step is to traverse the graph again for the two
 		// steps that cannot directly be performed during the plan enumeration, because we are dealing with DAGs
@@ -733,9 +568,8 @@ public class PactCompiler {
 		dp.resolveDeadlocks(bestPlanSinks);
 
 		// finalize the plan
-		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program, memoryPerInstance);
-		plan.setInstanceTypeName(instanceName);
-		
+		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
+
 		// swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so
 		// we can do this after the plan finalization
 		plan.accept(new BinaryUnionReplacer());
@@ -755,7 +589,7 @@ public class PactCompiler {
 	 *         from the plan can be traversed.
 	 */
 	public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(-1, 1);
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1);
 		program.accept(graphCreator);
 		return graphCreator.sinks;
 	}
@@ -783,22 +617,18 @@ public class PactCompiler {
 
 		private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
 
-		private final int maxMachines; // the maximum number of machines to use
-
 		private final int defaultParallelism; // the default degree of parallelism
 		
-		private int numMemoryConsumers;
-		
 		private final GraphCreatingVisitor parent;	// reference to enclosing creator, in case of a recursive translation
 		
 		private final boolean forceDOP;
 
 		
-		private GraphCreatingVisitor(int maxMachines, int defaultParallelism) {
-			this(null, false, maxMachines, defaultParallelism, null);
+		private GraphCreatingVisitor(int defaultParallelism) {
+			this(null, false, defaultParallelism, null);
 		}
 
-		private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int maxMachines,
+		private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP,
 									int defaultParallelism, HashMap<Operator<?>, OptimizerNode> closure) {
 			if (closure == null){
 				con2node = new HashMap<Operator<?>, OptimizerNode>();
@@ -807,7 +637,6 @@ public class PactCompiler {
 			}
 			this.sources = new ArrayList<DataSourceNode>(4);
 			this.sinks = new ArrayList<DataSinkNode>(2);
-			this.maxMachines = maxMachines;
 			this.defaultParallelism = defaultParallelism;
 			this.parent = parent;
 			this.forceDOP = forceDOP;
@@ -878,7 +707,6 @@ public class PactCompiler {
 				// catch this for the recursive translation of step functions
 				BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
 				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
 				n = p;
 			}
 			else if (c instanceof WorksetPlaceHolder) {
@@ -890,7 +718,6 @@ public class PactCompiler {
 				// catch this for the recursive translation of step functions
 				WorksetNode p = new WorksetNode(holder, containingIterationNode);
 				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
 				n = p;
 			}
 			else if (c instanceof SolutionSetPlaceHolder) {
@@ -902,18 +729,14 @@ public class PactCompiler {
 				// catch this for the recursive translation of step functions
 				SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
 				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
 				n = p;
 			}
 			else {
-				throw new IllegalArgumentException("Unknown operator type: " + c.getClass() + " " + c);
+				throw new IllegalArgumentException("Unknown operator type: " + c);
 			}
 
 			this.con2node.put(c, n);
 			
-			// record the potential memory consumption
-			this.numMemoryConsumers += n.isMemoryConsumer() ? 1 : 0;
-
 			// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
 			// key-less reducer (all-reduce)
 			if (n.getDegreeOfParallelism() < 1) {
@@ -931,19 +754,6 @@ public class PactCompiler {
 				n.setDegreeOfParallelism(par);
 			}
 
-			// check if we need to set the instance sharing accordingly such that
-			// the maximum number of machines is not exceeded
-			if (n.getSubtasksPerInstance() < 1) {
-				int tasksPerInstance = 1;
-				if (this.maxMachines > 0) {
-					int p = n.getDegreeOfParallelism();
-					tasksPerInstance = (p / this.maxMachines) + (p % this.maxMachines == 0 ? 0 : 1);
-				}
-	
-				// we group together n tasks per machine, depending on config and the above computed
-				// value required to obey the maximum number of machines
-				n.setSubtasksPerInstance(tasksPerInstance);
-			}
 			return true;
 		}
 
@@ -966,7 +776,7 @@ public class PactCompiler {
 
 				// first, recursively build the data flow for the step function
 				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
-					this.maxMachines, iterNode.getDegreeOfParallelism(), closure);
+					iterNode.getDegreeOfParallelism(), closure);
 				
 				BulkPartialSolutionNode partialSolution = null;
 				
@@ -994,9 +804,6 @@ public class PactCompiler {
 				iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
 				iterNode.setPartialSolution(partialSolution);
 				
-				// account for the nested memory consumers
-				this.numMemoryConsumers += recursiveCreator.numMemoryConsumers;
-				
 				// go over the contained data flow and mark the dynamic path nodes
 				StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
 				rootOfStepFunction.accept(identifier);
@@ -1013,7 +820,7 @@ public class PactCompiler {
 
 				// first, recursively build the data flow for the step function
 				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
-					this.maxMachines, iterNode.getDegreeOfParallelism(), closure);
+					iterNode.getDegreeOfParallelism(), closure);
 				// descend from the solution set delta. check that it depends on both the workset
 				// and the solution set. If it does depend on both, this descend should create both nodes
 				iter.getSolutionSetDelta().accept(recursiveCreator);
@@ -1067,19 +874,12 @@ public class PactCompiler {
 				iterNode.setPartialSolution(solutionSetNode, worksetNode);
 				iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode);
 				
-				// account for the nested memory consumers
-				this.numMemoryConsumers += recursiveCreator.numMemoryConsumers;
-				
 				// go over the contained data flow and mark the dynamic path nodes
 				StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
 				nextWorksetNode.accept(pathIdentifier);
 				iterNode.getSolutionSetDelta().accept(pathIdentifier);
 			}
 		}
-		
-		int getMemoryConsumerCount() {
-			return this.numMemoryConsumers;
-		}
 	};
 	
 	private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
@@ -1107,17 +907,14 @@ public class PactCompiler {
 	 * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
 	 * the number of memory consumers, and on the task's degree of parallelism.
 	 */
-	private static final class IdAndMemoryAndEstimatesVisitor implements Visitor<OptimizerNode> {
+	private static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
 		
 		private final DataStatistics statistics;
-		
-		private final long memoryPerTaskPerInstance;
-		
+
 		private int id = 1;
 		
-		private IdAndMemoryAndEstimatesVisitor(DataStatistics statistics, long memoryPerTaskPerInstance) {
+		private IdAndEstimatesVisitor(DataStatistics statistics) {
 			this.statistics = statistics;
-			this.memoryPerTaskPerInstance = memoryPerTaskPerInstance;
 		}
 
 
@@ -1128,11 +925,6 @@ public class PactCompiler {
 				return false;
 			}
 			
-			// assign minimum memory share, for lower bound estimates
-			final long mem = visitable.isMemoryConsumer() ? 
-					this.memoryPerTaskPerInstance / visitable.getSubtasksPerInstance() : 0;
-			visitable.setMinimalMemoryPerSubTask(mem);
-			
 			return true;
 		}
 
@@ -1234,8 +1026,6 @@ public class PactCompiler {
 		
 		private final Deque<IterationPlanNode> stackOfIterationNodes;
 
-		private long memoryPerInstance; // the amount of memory per instance
-		
 		private int memoryConsumerWeights; // a counter of all memory consumers
 
 		/**
@@ -1248,12 +1038,7 @@ public class PactCompiler {
 			this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
 		}
 
-		private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan, long memPerInstance) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Available memory per instance: " + memPerInstance);
-			}
-			
-			this.memoryPerInstance = memPerInstance;
+		private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
 			this.memoryConsumerWeights = 0;
 			
 			// traverse the graph
@@ -1263,44 +1048,36 @@ public class PactCompiler {
 
 			// assign the memory to each node
 			if (this.memoryConsumerWeights > 0) {
-				final long memoryPerInstanceAndWeight = this.memoryPerInstance / this.memoryConsumerWeights;
-				
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Memory per consumer weight: " + memoryPerInstanceAndWeight);
-				}
-				
 				for (PlanNode node : this.allNodes) {
 					// assign memory to the driver strategy of the node
 					final int consumerWeight = node.getMemoryConsumerWeight();
 					if (consumerWeight > 0) {
-						final long mem = memoryPerInstanceAndWeight * consumerWeight / node.getSubtasksPerInstance();
-						node.setMemoryPerSubTask(mem);
+						final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
+						node.setRelativeMemoryPerSubtask(relativeMem);
 						if (LOG.isDebugEnabled()) {
-							final long mib = mem >> 20;
-							LOG.debug("Assigned " + mib + " MiBytes memory to each subtask of " + 
-								node.getPactContract().getName() + " (" + mib * node.getDegreeOfParallelism() +
-								" MiBytes total.)"); 
+							LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
+								node.getPactContract().getName() + ".");
 						}
 					}
 					
 					// assign memory to the local and global strategies of the channels
 					for (Channel c : node.getInputs()) {
 						if (c.getLocalStrategy().dams()) {
-							final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
-							c.setMemoryLocalStrategy(mem);
+							final double relativeMem = 1.0 / this.memoryConsumerWeights;
+							c.setRelativeMemoryLocalStrategy(relativeMem);
 							if (LOG.isDebugEnabled()) {
-								final long mib = mem >> 20;
-								LOG.debug("Assigned " + mib + " MiBytes memory to each local strategy instance of " + 
-									c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)"); 
+								LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
+										"instance of " + c + ".");
 							}
 						}
 						if (c.getTempMode() != TempMode.NONE) {
-							final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
-							c.setTempMemory(mem);
+							final double relativeMem = 1.0/ this.memoryConsumerWeights;
+							c.setRelativeTempMemory(relativeMem);
 							if (LOG.isDebugEnabled()) {
-								final long mib = mem >> 20;
-								LOG.debug("Assigned " + mib + " MiBytes memory to each instance of the temp table for " + 
-									c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)"); 
+								LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
+										"table" +
+										" " +
+										"for " + c + ".");
 							}
 						}
 					}
@@ -1525,182 +1302,4 @@ public class PactCompiler {
 			throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
 		}
 	}
-
-	private InstanceTypeDescription getInstanceTypeInfo() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Connecting compiler to JobManager to dertermine instance information.");
-		}
-		
-		// create the connection in a separate thread, such that this thread
-		// can abort, if an unsuccessful connection occurs.
-		Map<InstanceType, InstanceTypeDescription> instances = null;
-		
-		JobManagerConnector jmc = new JobManagerConnector(this.jobManagerAddress);
-		Thread connectorThread = new Thread(jmc, "Compiler - JobManager connector.");
-		connectorThread.setDaemon(true);
-		connectorThread.start();
-
-		// connect and get the result
-		try {
-			jmc.waitForCompletion();
-			instances = jmc.instances;
-			if (instances == null) {
-				throw new NullPointerException("Returned instance map is <null>");
-			}
-		}
-		catch (IOException e) {
-			throw new CompilerException(e.getMessage());
-		}
-		catch (Throwable t) {
-			throw new CompilerException("Cannot connect to the JobManager to determine the available TaskManagers. "
-					+ "Check if the JobManager is running (using the web interface or log files). Reason: " + 
-				t.getMessage(), t);
-		}
-
-		// determine which type to run on
-		return getType(instances);
-	}
-	
-	/**
-	 * This utility method picks the instance type to be used for executing programs.
-	 * <p>
-	 * 
-	 * @param types The available types.
-	 * @return The type to be used for scheduling.
-	 * 
-	 * @throws CompilerException
-	 * @throws IllegalArgumentException
-	 */
-	private InstanceTypeDescription getType(Map<InstanceType, InstanceTypeDescription> types)
-	throws CompilerException
-	{
-		if (types == null || types.size() < 1) {
-			throw new IllegalArgumentException("No instance type found.");
-		}
-		
-		InstanceTypeDescription retValue = null;
-		long totalMemory = 0;
-		int numInstances = 0;
-		
-		final Iterator<InstanceTypeDescription> it = types.values().iterator();
-		while(it.hasNext())
-		{
-			final InstanceTypeDescription descr = it.next();
-			
-			// skip instances for which no hardware description is available
-			// this means typically that no 
-			if (descr.getHardwareDescription() == null || descr.getInstanceType() == null) {
-				continue;
-			}
-			
-			final int curInstances = descr.getMaximumNumberOfAvailableInstances();
-			final long curMemory = curInstances * descr.getHardwareDescription().getSizeOfFreeMemory();
-			
-			// get, if first, or if it has more instances and not less memory, or if it has significantly more memory
-			// and the same number of cores still
-			if ( (retValue == null) ||
-				(curInstances > numInstances && (int) (curMemory * 1.2f) > totalMemory) ||
-				(curInstances * retValue.getInstanceType().getNumberOfCores() >= numInstances && 
-							(int) (curMemory * 1.5f) > totalMemory)
-				)
-			{
-				retValue = descr;
-				numInstances = curInstances;
-				totalMemory = curMemory;
-			}
-		}
-		
-		if (retValue == null) {
-			throw new CompilerException("No instance currently registered at the job-manager. Retry later.\n" +
-				"If the system has recently started, it may take a few seconds until the instances register.");
-		}
-		
-		return retValue;
-	}
-	
-	/**
-	 * Utility class for an asynchronous connection to the job manager to determine the available instances.
-	 */
-	private static final class JobManagerConnector implements Runnable {
-		
-		private static final long MAX_MILLIS_TO_WAIT = 10000;
-		
-		private final InetSocketAddress jobManagerAddress;
-		
-		private final Object lock = new Object();
-		
-		private volatile Map<InstanceType, InstanceTypeDescription> instances;
-		
-		private volatile Throwable error;
-		
-		
-		private JobManagerConnector(InetSocketAddress jobManagerAddress) {
-			this.jobManagerAddress = jobManagerAddress;
-		}
-		
-		
-		public Map<InstanceType, InstanceTypeDescription> waitForCompletion() throws Throwable {
-			long start = System.currentTimeMillis();
-			long remaining = MAX_MILLIS_TO_WAIT;
-			
-			if (this.error != null) {
-				throw this.error;
-			}
-			if (this.instances != null) {
-				return this.instances;
-			}
-			
-			do {
-				try {
-					synchronized (this.lock) {
-						this.lock.wait(remaining);
-					}
-				} catch (InterruptedException iex) {}
-			}
-			while (this.error == null && this.instances == null &&
-					(remaining = MAX_MILLIS_TO_WAIT + start - System.currentTimeMillis()) > 0);
-			
-			if (this.error != null) {
-				throw this.error;
-			}
-			if (this.instances != null) {
-				return this.instances;
-			}
-			
-			throw new IOException("Could not connect to the JobManager at " + jobManagerAddress + 
-				". Please make sure that the Job Manager is started properly.");
-		}
-		
-
-		@Override
-		public void run() {
-			ExtendedManagementProtocol jobManagerConnection = null;
-
-			try {
-				jobManagerConnection = RPC.getProxy(ExtendedManagementProtocol.class,
-					this.jobManagerAddress, NetUtils.getSocketFactory());
-
-				this.instances = jobManagerConnection.getMapOfAvailableInstanceTypes();
-				if (this.instances == null) {
-					throw new IOException("Returned instance map was <null>");
-				}
-			} catch (Throwable t) {
-				this.error = t;
-			} finally {
-				// first of all, signal completion
-				synchronized (this.lock) {
-					this.lock.notifyAll();
-				}
-				
-				if (jobManagerConnection != null) {
-					try {
-						RPC.stopProxy(jobManagerConnection);
-					} catch (Throwable t) {
-						LOG.error("Could not cleanly shut down connection from compiler to job manager,", t);
-					}
-				}
-				jobManagerConnection = null;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index 058af1a..fde5970 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -95,14 +95,20 @@ public class DefaultCostEstimator extends CostEstimator {
 
 	@Override
 	public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
-		// assumption: we need ship the whole data over the network to each node.
-		final long estOutShipSize = estimates.getEstimatedOutputSize();
-		if (estOutShipSize <= 0) {
-			costs.setNetworkCost(Costs.UNKNOWN);
+		// if our replication factor is negative, we cannot calculate broadcast costs
+
+		if (replicationFactor > 0) {
+			// assumption: we need ship the whole data over the network to each node.
+			final long estOutShipSize = estimates.getEstimatedOutputSize();
+			if (estOutShipSize <= 0) {
+				costs.setNetworkCost(Costs.UNKNOWN);
+			} else {
+				costs.addNetworkCost(replicationFactor * estOutShipSize);
+			}
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
 		} else {
-			costs.addNetworkCost(replicationFactor * estOutShipSize);
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200);
 		}
-		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor * 100);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
index 8fd6f79..2f7cb2b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
@@ -42,11 +42,6 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
 	public abstract IterationNode getIterationNode();
 	
 	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isMemoryConsumer() {
-		return false;
-	}
 	
 	public boolean isOnDynamicPath() {
 		return true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
index 70752b5..50ec01b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
@@ -122,20 +122,12 @@ public class BinaryUnionNode extends TwoInputNode {
 		final RequestedLocalProperties noLocalProps = new RequestedLocalProperties();
 		
 		final int dop = getDegreeOfParallelism();
-		final int subPerInstance = getSubtasksPerInstance();
-		final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
 		final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
-		final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
 		final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
-		final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
-		
-		final boolean globalDopChange1 = numInstances != inNumInstances1;
-		final boolean globalDopChange2 = numInstances != inNumInstances2;
-		final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
-		final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
-		
+
+		final boolean dopChange1 = dop != inDop1;
+		final boolean dopChange2 = dop != inDop2;
+
 		// enumerate all pairwise combination of the children's plans together with
 		// all possible operator strategy combination
 		
@@ -154,15 +146,11 @@ public class BinaryUnionNode extends TwoInputNode {
 					Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
 					if (this.input1.getShipStrategy() == null) {
 						// free to choose the ship strategy
-						igps.parameterizeChannel(c1, globalDopChange1, localDopChange1);
+						igps.parameterizeChannel(c1, dopChange1);
 						
 						// if the DOP changed, make sure that we cancel out properties, unless the
 						// ship strategy preserves/establishes them even under changing DOPs
-						if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
-							c1.getGlobalProperties().reset();
-						}
-						if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() || 
-									c1.getShipStrategy().compensatesForLocalDOPChanges())) {
+						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
 							c1.getGlobalProperties().reset();
 						}
 					} else {
@@ -173,10 +161,8 @@ public class BinaryUnionNode extends TwoInputNode {
 							c1.setShipStrategy(this.input1.getShipStrategy());
 						}
 						
-						if (globalDopChange1) {
+						if (dopChange1) {
 							c1.adjustGlobalPropertiesForFullParallelismChange();
-						} else if (localDopChange1) {
-							c1.adjustGlobalPropertiesForLocalParallelismChange();
 						}
 					}
 					
@@ -184,15 +170,11 @@ public class BinaryUnionNode extends TwoInputNode {
 					Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
 					if (this.input2.getShipStrategy() == null) {
 						// free to choose the ship strategy
-						igps.parameterizeChannel(c2, globalDopChange2, localDopChange2);
+						igps.parameterizeChannel(c2, dopChange2);
 						
 						// if the DOP changed, make sure that we cancel out properties, unless the
 						// ship strategy preserves/establishes them even under changing DOPs
-						if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
-							c2.getGlobalProperties().reset();
-						}
-						if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() || 
-									c2.getShipStrategy().compensatesForLocalDOPChanges())) {
+						if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
 							c2.getGlobalProperties().reset();
 						}
 					} else {
@@ -203,10 +185,8 @@ public class BinaryUnionNode extends TwoInputNode {
 							c2.setShipStrategy(this.input2.getShipStrategy());
 						}
 						
-						if (globalDopChange2) {
+						if (dopChange2) {
 							c2.adjustGlobalPropertiesForFullParallelismChange();
-						} else if (localDopChange2) {
-							c2.adjustGlobalPropertiesForLocalParallelismChange();
 						}
 					}
 					
@@ -224,20 +204,20 @@ public class BinaryUnionNode extends TwoInputNode {
 						if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
 							// adjust c2 to c1
 							c2 = c2.clone();
-							p1.parameterizeChannel(c2,globalDopChange2);
+							p1.parameterizeChannel(c2,dopChange2);
 						} else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
 							// adjust c1 to c2
 							c1 = c1.clone();
-							p2.parameterizeChannel(c1,globalDopChange1);
+							p2.parameterizeChannel(c1,dopChange1);
 						} else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
 							boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
 									c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
 							if (adjustC1) {
 								c2 = c2.clone();
-								p1.parameterizeChannel(c2, globalDopChange2);
+								p1.parameterizeChannel(c2, dopChange2);
 							} else {
 								c1 = c1.clone();
-								p2.parameterizeChannel(c1, globalDopChange1);
+								p2.parameterizeChannel(c1, dopChange1);
 							}
 						} else {
 							// this should never happen, as it implies both realize a different strategy, which is

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index f6720ea..bfbca15 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -65,9 +65,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Creates a new node with a single input for the optimizer plan.
+	 * Creates a new node for the bulk iteration.
 	 * 
-	 * @param iteration The PACT that the node represents.
+	 * @param iteration The bulk iteration the node represents.
 	 */
 	public BulkIterationNode(BulkIterationBase<?> iteration) {
 		super(iteration);
@@ -124,14 +124,12 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
 		
 		// check if the root of the step function has the same DOP as the iteration
-		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
-			nextPartialSolution.getSubtasksPerInstance() != getSubtasksPerInstance() )
+		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism())
 		{
 			// add a no-op to the root to express the re-partitioning
 			NoOpNode noop = new NoOpNode();
 			noop.setDegreeOfParallelism(getDegreeOfParallelism());
-			noop.setSubtasksPerInstance(getSubtasksPerInstance());
-			
+
 			PactConnection noOpConn = new PactConnection(nextPartialSolution, noop);
 			noop.setIncomingConnection(noOpConn);
 			nextPartialSolution.addOutgoingConnection(noOpConn);
@@ -198,12 +196,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
 		return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor());
 	}
-	
-	@Override
-	public boolean isMemoryConsumer() {
-		return true;
-	}
-	
+
 	@Override
 	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
 		final InterestingProperties intProps = getInterestingProperties().clone();
@@ -306,12 +299,11 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
 					// attach a no-op node through which we create the properties of the original input
 					Channel toNoOp = new Channel(candidate);
-					globPropsReq.parameterizeChannel(toNoOp, false, false);
+					globPropsReq.parameterizeChannel(toNoOp, false);
 					locPropsReq.parameterizeChannel(toNoOp);
 					
 					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
 					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
-					rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
 					
 					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
 					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index fe823d2..d4f9d67 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -87,11 +87,6 @@ public class DataSinkNode extends OptimizerNode {
 	}
 
 	@Override
-	public boolean isMemoryConsumer() {
-		return getPactContract().getPartitionOrdering() != null || getPactContract().getLocalOrder() != null;
-	}
-
-	@Override
 	public List<PactConnection> getIncomingConnections() {
 		return Collections.singletonList(this.input);
 	}
@@ -194,21 +189,16 @@ public class DataSinkNode extends OptimizerNode {
 		List<PlanNode> outputPlans = new ArrayList<PlanNode>();
 		
 		final int dop = getDegreeOfParallelism();
-		final int subPerInstance = getSubtasksPerInstance();
 		final int inDop = getPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
-		final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
-		final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
-		
-		final boolean globalDopChange = numInstances != inNumInstances;
-		final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
-		
+
+		final boolean dopChange = dop != inDop;
+
 		InterestingProperties ips = this.input.getInterestingProperties();
 		for (PlanNode p : subPlans) {
 			for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
 				for (RequestedLocalProperties lp : ips.getLocalProperties()) {
 					Channel c = new Channel(p);
-					gp.parameterizeChannel(c, globalDopChange, localDopChange);
+					gp.parameterizeChannel(c, dopChange);
 					lp.parameterizeChannel(c);
 					c.setRequiredLocalProps(lp);
 					c.setRequiredGlobalProps(gp);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
index 17c11c9..7234420 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
@@ -55,7 +55,6 @@ public class DataSourceNode extends OptimizerNode {
 		
 		if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
 			setDegreeOfParallelism(1);
-			setSubtasksPerInstance(1);
 			this.sequentialInput = true;
 		} else {
 			this.sequentialInput = false;
@@ -78,27 +77,12 @@ public class DataSourceNode extends OptimizerNode {
 	}
 
 	@Override
-	public boolean isMemoryConsumer() {
-		return false;
-	}
-	
-
-	@Override
 	public void setDegreeOfParallelism(int degreeOfParallelism) {
 		// if unsplittable, DOP remains at 1
 		if (!this.sequentialInput) {
 			super.setDegreeOfParallelism(degreeOfParallelism);
 		}
 	}
-	
-
-	@Override
-	public void setSubtasksPerInstance(int instancesPerMachine) {
-		// if unsplittable, DOP remains at 1
-		if (!this.sequentialInput) {
-			super.setSubtasksPerInstance(instancesPerMachine);
-		}
-	}
 
 	@Override
 	public List<PactConnection> getIncomingConnections() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
index 6eb2903..4d7230e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
@@ -46,7 +46,6 @@ public class GroupReduceNode extends SingleInputNode {
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
 			setDegreeOfParallelism(1);
-			setSubtasksPerInstance(1);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index b2c9330..85a6568 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -262,13 +262,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 */
 	@Override
 	public abstract void accept(Visitor<OptimizerNode> visitor);
-
-	/**
-	 * Checks, whether this node requires memory for its tasks or not.
-	 * 
-	 * @return True, if this node contains logic that requires memory usage, false otherwise.
-	 */
-	public abstract boolean isMemoryConsumer();
 	
 	/**
 	 * Checks whether a field is modified by the user code or whether it is kept unchanged.
@@ -408,7 +401,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * @param degreeOfParallelism
 	 *        The degree of parallelism to set.
 	 * @throws IllegalArgumentException
-	 *         If the degree of parallelism is smaller than one.
+	 *         If the degree of parallelism is smaller than one and not -1.
 	 */
 	public void setDegreeOfParallelism(int degreeOfParallelism) {
 		if (degreeOfParallelism < 1) {
@@ -416,48 +409,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		}
 		this.degreeOfParallelism = degreeOfParallelism;
 	}
-
-	/**
-	 * Gets the number of parallel instances of the contract that are
-	 * to be executed on the same compute instance (logical machine).
-	 * 
-	 * @return The number of subtask instances per machine.
-	 */
-	public int getSubtasksPerInstance() {
-		return this.subtasksPerInstance;
-	}
-
-	/**
-	 * Sets the number of parallel task instances of the contract that are
-	 * to be executed on the same computing instance (logical machine).
-	 * 
-	 * @param instancesPerMachine The instances per machine.
-	 * @throws IllegalArgumentException If the number of instances per machine is smaller than one.
-	 */
-	public void setSubtasksPerInstance(int instancesPerMachine) {
-		if (instancesPerMachine < 1) {
-			throw new IllegalArgumentException();
-		}
-		this.subtasksPerInstance = instancesPerMachine;
-	}
-	
-	/**
-	 * Gets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
-	 *
-	 * @return The minimal guaranteed memory per subtask, in bytes.
-	 */
-	public long getMinimalMemoryPerSubTask() {
-		return this.minimalMemoryPerSubTask;
-	}
-	
-	/**
-	 * Sets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
-	 *
-	 * @param minimalGuaranteedMemory The minimal guaranteed memory per subtask, in bytes.
-	 */
-	public void setMinimalMemoryPerSubTask(long minimalGuaranteedMemory) {
-		this.minimalMemoryPerSubTask = minimalGuaranteedMemory;
-	}
 	
 	/**
 	 * Gets the amount of memory that all subtasks of this task have jointly available.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
index 2190060..409d027 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
@@ -36,7 +36,6 @@ public class ReduceNode extends SingleInputNode {
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
 			setDegreeOfParallelism(1);
-			setSubtasksPerInstance(1);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 8bf3f16..0b872a7 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -206,22 +206,6 @@ public abstract class SingleInputNode extends OptimizerNode {
 	
 	protected abstract List<OperatorDescriptorSingle> getPossibleProperties();
 	
-
-	@Override
-	public boolean isMemoryConsumer() {
-		for (OperatorDescriptorSingle dps : getPossibleProperties()) {
-			if (dps.getStrategy().firstDam().isMaterializing()) {
-				return true;
-			}
-			for (RequestedLocalProperties rlp : dps.getPossibleLocalProperties()) {
-				if (!rlp.isTrivial()) {
-					return true;
-				}
-			}
-		}
-		return false;
-	}
-
 	@Override
 	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
 		// get what we inherit and what is preserved by our user code 
@@ -284,30 +268,21 @@ public abstract class SingleInputNode extends OptimizerNode {
 		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
 		
 		final int dop = getDegreeOfParallelism();
-		final int subPerInstance = getSubtasksPerInstance();
 		final int inDop = getPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
-		final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
-		final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
-		
-		final boolean globalDopChange = numInstances != inNumInstances;
-		final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
-		
+
+		final boolean dopChange = inDop != dop;
+
 		// create all candidates
 		for (PlanNode child : subPlans) {
 			if (this.inConn.getShipStrategy() == null) {
 				// pick the strategy ourselves
 				for (RequestedGlobalProperties igps: intGlobal) {
 					final Channel c = new Channel(child, this.inConn.getMaterializationMode());
-					igps.parameterizeChannel(c, globalDopChange, localDopChange);
+					igps.parameterizeChannel(c, dopChange);
 					
 					// if the DOP changed, make sure that we cancel out properties, unless the
 					// ship strategy preserves/establishes them even under changing DOPs
-					if (globalDopChange && !c.getShipStrategy().isNetworkStrategy()) {
-						c.getGlobalProperties().reset();
-					}
-					if (localDopChange && !(c.getShipStrategy().isNetworkStrategy() || 
-								c.getShipStrategy().compensatesForLocalDOPChanges())) {
+					if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
 						c.getGlobalProperties().reset();
 					}
 					
@@ -332,12 +307,10 @@ public abstract class SingleInputNode extends OptimizerNode {
 					c.setShipStrategy(this.inConn.getShipStrategy());
 				}
 				
-				if (globalDopChange) {
+				if (dopChange) {
 					c.adjustGlobalPropertiesForFullParallelismChange();
-				} else if (localDopChange) {
-					c.adjustGlobalPropertiesForLocalParallelismChange();
 				}
-				
+
 				// check whether we meet any of the accepted properties
 				for (RequestedGlobalProperties rgps: allValidGlobals) {
 					if (rgps.isMetBy(c.getGlobalProperties())) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
index 2c765a5..a711ac5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
@@ -42,7 +42,6 @@ public class SinkJoiner extends TwoInputNode {
 		this.input2 = conn2;
 		
 		setDegreeOfParallelism(1);
-		setSubtasksPerInstance(1);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 9898c81..97a92d0 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -251,22 +251,6 @@ public abstract class TwoInputNode extends OptimizerNode {
 	}
 	
 	protected abstract List<OperatorDescriptorDual> getPossibleProperties();
-	
-	@Override
-	public boolean isMemoryConsumer() {
-		for (OperatorDescriptorDual dpd : this.possibleProperties) {
-			if (dpd.getStrategy().firstDam().isMaterializing() ||
-				dpd.getStrategy().secondDam().isMaterializing()) {
-				return true;
-			}
-			for (LocalPropertiesPair prp : dpd.getPossibleLocalProperties()) {
-				if (!(prp.getProperties1().isTrivial() && prp.getProperties2().isTrivial())) {
-					return true;
-				}
-			}
-		}
-		return false;
-	}
 
 	@Override
 	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
@@ -348,20 +332,12 @@ public abstract class TwoInputNode extends OptimizerNode {
 		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
 		
 		final int dop = getDegreeOfParallelism();
-		final int subPerInstance = getSubtasksPerInstance();
-		final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
 		final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
-		final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
 		final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
-		final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
-		final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
-		
-		final boolean globalDopChange1 = numInstances != inNumInstances1;
-		final boolean globalDopChange2 = numInstances != inNumInstances2;
-		final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
-		final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
-		
+
+		final boolean dopChange1 = dop != inDop1;
+		final boolean dopChange2 = dop != inDop2;
+
 		// enumerate all pairwise combination of the children's plans together with
 		// all possible operator strategy combination
 		
@@ -380,15 +356,11 @@ public abstract class TwoInputNode extends OptimizerNode {
 					final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
 					if (this.input1.getShipStrategy() == null) {
 						// free to choose the ship strategy
-						igps1.parameterizeChannel(c1, globalDopChange1, localDopChange1);
+						igps1.parameterizeChannel(c1, dopChange1);
 						
 						// if the DOP changed, make sure that we cancel out properties, unless the
 						// ship strategy preserves/establishes them even under changing DOPs
-						if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
-							c1.getGlobalProperties().reset();
-						}
-						if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() || 
-									c1.getShipStrategy().compensatesForLocalDOPChanges())) {
+						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
 							c1.getGlobalProperties().reset();
 						}
 					} else {
@@ -399,10 +371,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 							c1.setShipStrategy(this.input1.getShipStrategy());
 						}
 						
-						if (globalDopChange1) {
+						if (dopChange1) {
 							c1.adjustGlobalPropertiesForFullParallelismChange();
-						} else if (localDopChange1) {
-							c1.adjustGlobalPropertiesForLocalParallelismChange();
 						}
 					}
 					
@@ -411,15 +381,11 @@ public abstract class TwoInputNode extends OptimizerNode {
 						final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
 						if (this.input2.getShipStrategy() == null) {
 							// free to choose the ship strategy
-							igps2.parameterizeChannel(c2, globalDopChange2, localDopChange2);
+							igps2.parameterizeChannel(c2, dopChange2);
 							
 							// if the DOP changed, make sure that we cancel out properties, unless the
 							// ship strategy preserves/establishes them even under changing DOPs
-							if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
-								c2.getGlobalProperties().reset();
-							}
-							if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() || 
-										c2.getShipStrategy().compensatesForLocalDOPChanges())) {
+							if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
 								c2.getGlobalProperties().reset();
 							}
 						} else {
@@ -430,10 +396,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 								c2.setShipStrategy(this.input2.getShipStrategy());
 							}
 							
-							if (globalDopChange2) {
+							if (dopChange2) {
 								c2.adjustGlobalPropertiesForFullParallelismChange();
-							} else if (localDopChange2) {
-								c2.adjustGlobalPropertiesForLocalParallelismChange();
 							}
 						}
 						


Mime
View raw message