flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [21/53] [abbrv] git commit: Merge fix to omit input/output registering on JobManager Rework Invokable Task Hierarchy
Date Thu, 26 Jun 2014 09:46:46 GMT
Merge fix to omit input/output registering on JobManager
Rework Invokable Task Hierarchy


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8c1d82a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8c1d82a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8c1d82a8

Branch: refs/heads/travis_test
Commit: 8c1d82a8ec674de6525319501c6be2674e3143f1
Parents: 2692643
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jun 20 21:13:23 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jun 22 21:07:21 2014 +0200

----------------------------------------------------------------------
 .../stratosphere/client/program/ClientTest.java |  29 +-
 .../plantranslate/NepheleJobGraphGenerator.java |  66 ++---
 .../stratosphere/api/common/PlanExecutor.java   |   2 -
 .../api/common/io/FileOutputFormat.java         | 291 +++++--------------
 .../stratosphere/api/common/io/FormatUtil.java  |   1 -
 .../api/common/io/InitializeOnMaster.java       |  35 +++
 .../api/common/io/OutputFormat.java             |  15 +-
 .../configuration/Configuration.java            |  16 +-
 .../eu/stratosphere/core/fs/FileSystem.java     |   3 +-
 .../eu/stratosphere/core/io/StringRecord.java   |   6 +-
 .../eu/stratosphere/util/IterableIterator.java  |   4 +-
 .../api/java/io/PrintingOutputFormat.java       |   3 -
 .../nephele/execution/RuntimeEnvironment.java   |   1 -
 .../nephele/executiongraph/ExecutionGraph.java  | 104 ++++---
 .../executiongraph/ExecutionGroupVertex.java    |   1 -
 .../jobgraph/AbstractJobInputVertex.java        |  19 +-
 .../jobgraph/AbstractJobOutputVertex.java       |   9 +-
 .../nephele/jobgraph/AbstractJobVertex.java     |  31 +-
 .../stratosphere/nephele/jobgraph/JobGraph.java |  31 +-
 .../nephele/jobgraph/JobInputVertex.java        | 155 ++--------
 .../nephele/jobgraph/JobOutputVertex.java       | 132 ++-------
 .../nephele/jobgraph/JobTaskVertex.java         |  51 +---
 .../nephele/jobmanager/JobManager.java          |   4 +-
 .../splitassigner/InputSplitManager.java        |   2 -
 .../LocatableInputSplitAssigner.java            |   2 -
 .../file/FileInputSplitAssigner.java            |   5 -
 .../nephele/taskmanager/TaskManager.java        |   2 +-
 .../nephele/template/AbstractInputTask.java     |  79 -----
 .../nephele/template/AbstractInvokable.java     |   1 -
 .../nephele/template/AbstractOutputTask.java    |  22 --
 .../nephele/template/AbstractTask.java          |  21 --
 .../runtime/iterative/io/FakeOutputTask.java    |   4 +-
 .../task/IterationSynchronizationSinkTask.java  |   4 +-
 .../iterative/task/IterationTailPactTask.java   |   8 +-
 .../pact/runtime/task/DataSinkTask.java         |  10 +-
 .../pact/runtime/task/DataSourceTask.java       | 109 +++----
 .../pact/runtime/task/RegularPactTask.java      |  16 +-
 .../pact/runtime/task/util/TaskConfig.java      |   6 +-
 .../runtime/io/api/MutableRecordReader.java     |  38 +--
 .../runtime/io/api/RecordReader.java            |  18 +-
 .../runtime/io/api/RecordWriter.java            |  22 +-
 .../executiongraph/ExecutionGraphTest.java      | 163 ++++++-----
 .../ForwardTask1Input1Output.java               |   4 +-
 .../ForwardTask1Input2Outputs.java              |   4 +-
 .../ForwardTask2Inputs1Output.java              |   4 +-
 .../executiongraph/SelfCrossForwardTask.java    |  13 +-
 .../nephele/jobmanager/DoubleSourceTask.java    | 132 +++++++++
 .../nephele/jobmanager/DoubleTargetTask.java    |  24 +-
 .../jobmanager/ExceptionOutputFormat.java       |  26 +-
 .../nephele/jobmanager/ExceptionTask.java       |  11 +-
 .../nephele/jobmanager/ForwardTask.java         |  16 +-
 .../nephele/jobmanager/JobManagerITCase.java    | 158 +++++-----
 .../jobmanager/RuntimeExceptionTask.java        |  13 +-
 .../nephele/jobmanager/UnionTask.java           |  22 +-
 .../scheduler/queue/DefaultSchedulerTest.java   |  68 ++---
 .../nephele/util/tasks/DoubleSourceTask.java    | 134 +++++++++
 .../nephele/util/tasks/FileLineReader.java      | 133 +++++++++
 .../nephele/util/tasks/FileLineWriter.java      |  72 +++++
 .../nephele/util/tasks/JobFileInputVertex.java  | 255 ++++++++++++++++
 .../nephele/util/tasks/JobFileOutputVertex.java | 109 +++++++
 .../runtime/hash/HashMatchIteratorITCase.java   |   4 +-
 .../runtime/hash/ReOpenableHashTableITCase.java |   3 +-
 .../pact/runtime/io/ChannelViewsTest.java       |   4 +-
 .../pact/runtime/io/SpillingBufferTest.java     |   4 +-
 .../sort/AsynchonousPartialSorterITCase.java    |  10 +-
 .../CombiningUnilateralSortMergerITCase.java    |   4 +-
 .../pact/runtime/sort/ExternalSortITCase.java   |   8 +-
 .../sort/MassiveStringSortingITCase.java        |   8 +-
 .../sort/SortMergeMatchIteratorITCase.java      |  11 +-
 .../task/util/HashVsSortMiniBenchmark.java      |   4 +-
 .../pact/runtime/test/util/DummyInvokable.java  |   6 +-
 .../pact/runtime/test/util/TaskTestBase.java    |  15 +-
 .../bufferprovider/LocalBufferPoolTest.java     |   6 +
 .../TransitiveClosureITCase.java                |   2 +-
 .../test/iterative/nephele/JobGraphUtils.java   |  13 +-
 .../recordJobs/util/DiscardingOutputFormat.java |  20 +-
 .../test/runtime/NetworkStackThroughput.java    |  47 ++-
 77 files changed, 1567 insertions(+), 1341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
index b3f8159..a948706 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
@@ -28,10 +28,7 @@ import org.mockito.Mock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import eu.stratosphere.api.common.InvalidProgramException;
 import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.LocalEnvironment;
-import eu.stratosphere.client.LocalExecutor;
 import eu.stratosphere.compiler.DataStatistics;
 import eu.stratosphere.compiler.PactCompiler;
 import eu.stratosphere.compiler.costs.CostEstimator;
@@ -134,16 +131,16 @@ public class ClientTest {
 		verify(this.jobClientMock).submitJob();
 	}
 	
-
-	@Test(expected=InvalidProgramException.class)
-	public void tryLocalExecution() throws Exception {
-		new Client(configMock);
-		LocalExecutor.execute(planMock);
-	}
-	
-	@Test(expected=InvalidProgramException.class)
-	public void tryLocalEnvironmentExecution() throws Exception {
-		new Client(configMock);
-		new LocalEnvironment();
-	}
-}
+//
+//	@Test(expected=InvalidProgramException.class)
+//	public void tryLocalExecution() throws Exception {
+//		new Client(configMock);
+//		LocalExecutor.execute(planMock);
+//	}
+//	
+//	@Test(expected=InvalidProgramException.class)
+//	public void tryLocalEnvironmentExecution() throws Exception {
+//		new Client(configMock);
+//		new LocalEnvironment();
+//	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3089cdb..3c1e9e3 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -20,14 +20,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-
-import eu.stratosphere.api.common.io.InputFormat;
-import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map.Entry;
 
 import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
 import eu.stratosphere.api.common.aggregators.AggregatorWithName;
@@ -66,7 +59,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
 import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
 import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
@@ -760,7 +752,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		} else {
 			// create task vertex
 			vertex = new JobTaskVertex(taskName, this.jobGraph);
-			vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
 			
 			config = new TaskConfig(vertex.getConfiguration());
 			config.setDriver(ds.getDriverClass());
@@ -786,7 +778,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		final DriverStrategy ds = node.getDriverStrategy();
 		final JobTaskVertex vertex = new JobTaskVertex(taskName, this.jobGraph);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-		vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
 		
 		// set user code
 		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
@@ -812,31 +804,29 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 	private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
 		final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
+		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
-		// set task class
-		@SuppressWarnings("unchecked")
-		final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask
-				.class;
-		vertex.setInputClass(clazz);
+		vertex.setInvokableClass(DataSourceTask.class);
 
 		// set user code
-		vertex.setInputFormat((UserCodeWrapper<? extends InputFormat<?, InputSplit>>)node.getPactContract()
-				.getUserCodeWrapper());
-		vertex.setInputFormatParameters(node.getPactContract().getParameters());
-		vertex.setOutputSerializer(node.getSerializer());
+		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
+		config.setStubParameters(node.getPactContract().getParameters());
+
+		config.setOutputSerializer(node.getSerializer());
 		return vertex;
 	}
 
 	private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
 		final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
+		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
-		vertex.setOutputClass(DataSinkTask.class);
+		vertex.setInvokableClass(DataSinkTask.class);
 		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
 
 		// set user code
-		vertex.setOutputFormat((UserCodeWrapper<? extends OutputFormat<?>>)node.getPactContract().getUserCodeWrapper());
-		vertex.setOutputFormatParameters(node.getPactContract().getParameters());
-		
+		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
+		config.setStubParameters(node.getPactContract().getParameters());
+
 		return vertex;
 	}
 	
@@ -884,7 +874,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			}
 			
 			// reset the vertex type to iteration head
-			headVertex.setTaskClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			toReturn = null;
 		} else {
@@ -892,7 +882,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
 			headVertex = new JobTaskVertex("PartialSolution ("+iteration.getNodeName()+")", this.jobGraph);
-			headVertex.setTaskClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
 			toReturn = headVertex;
@@ -952,7 +942,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			}
 			
 			// reset the vertex type to iteration head
-			headVertex.setTaskClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			toReturn = null;
 		} else {
@@ -960,7 +950,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
 			headVertex = new JobTaskVertex("IterationHead("+iteration.getNodeName()+")", this.jobGraph);
-			headVertex.setTaskClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
 			toReturn = headVertex;
@@ -1144,7 +1134,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		// --------------------------- create the sync task ---------------------------
 		final JobOutputVertex sync = new JobOutputVertex("Sync(" +
 					bulkNode.getNodeName() + ")", this.jobGraph);
-		sync.setOutputClass(IterationSynchronizationSinkTask.class);
+		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setNumberOfSubtasks(1);
 		this.auxVertices.add(sync);
 		
@@ -1192,14 +1182,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		// No following termination criterion
 		if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {
 			
-			rootOfStepFunctionVertex.setTaskClass(IterationTailPactTask.class);
+			rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
 			
 			tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
 			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
 			// create the fake output task
 			JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
-			fakeTail.setOutputClass(FakeOutputTask.class);
+			fakeTail.setInvokableClass(FakeOutputTask.class);
 			fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 			this.auxVertices.add(fakeTail);
 			
@@ -1234,14 +1224,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
 			}
 			
-			rootOfTerminationCriterionVertex.setTaskClass(IterationTailPactTask.class);
+			rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
 			// Hack
 			tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
 			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
 			tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
 			JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
-			fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
+			fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
 			fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 			this.auxVertices.add(fakeTailTerminationCriterion);
 		
@@ -1309,7 +1299,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		{
 			final JobOutputVertex sync = new JobOutputVertex("Sync (" +
 						iterNode.getNodeName() + ")", this.jobGraph);
-			sync.setOutputClass(IterationSynchronizationSinkTask.class);
+			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 			sync.setNumberOfSubtasks(1);
 			this.auxVertices.add(sync);
 			
@@ -1367,14 +1357,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				worksetTailConfig.setIsWorksetUpdate();
 				
 				if (hasWorksetTail) {
-					nextWorksetVertex.setTaskClass(IterationTailPactTask.class);
+					nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
 					
 					worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
 					worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 					
 					// create the fake output task
 					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
-					fakeTail.setOutputClass(FakeOutputTask.class);
+					fakeTail.setInvokableClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 					this.auxVertices.add(fakeTail);
 					
@@ -1405,14 +1395,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				solutionDeltaConfig.setIsSolutionSetUpdate();
 				
 				if (hasSolutionSetTail) {
-					solutionDeltaVertex.setTaskClass(IterationTailPactTask.class);
+					solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
 					
 					solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
 					solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 	
 					// create the fake output task
 					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
-					fakeTail.setOutputClass(FakeOutputTask.class);
+					fakeTail.setInvokableClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 					this.auxVertices.add(fakeTail);
 					

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
index 7caaab2..d91abf8 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
@@ -53,7 +53,6 @@ public abstract class PlanExecutor {
 	 * Creates an executor that runs the plan locally in a multi-threaded environment.
 	 * 
 	 * @return A local executor.
-	 * @see eu.stratosphere.client.LocalExecutor
 	 */
 	public static PlanExecutor createLocalExecutor() {
 		Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
@@ -75,7 +74,6 @@ public abstract class PlanExecutor {
 	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
 	 *                 from within the UDFs.
 	 * @return A remote executor.
-	 * @see eu.stratosphere.client.RemoteExecutor
 	 */
 	public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
 		if (hostname == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
index c4e1d5a..d43c987 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
@@ -31,14 +31,14 @@ import eu.stratosphere.core.fs.Path;
  * The abstract base class for all output formats that are file based. Contains the logic to open/close the target
  * file streams.
  */
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+	
 	private static final long serialVersionUID = 1L;
 
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Defines the behavior for creating output directories. 
-	 *
+	 * Behavior for creating output directories. 
 	 */
 	public static enum OutputDirectoryMode {
 		
@@ -54,7 +54,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 
 	private static WriteMode DEFAULT_WRITE_MODE;
 	
-	private static  OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
+	private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
 	
 	
 	private static final void initDefaultsFromConfiguration() {
@@ -100,11 +100,6 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 	 */
 	private OutputDirectoryMode outputDirectoryMode;
 	
-	/**
-	 * Stream opening timeout.
-	 */
-	private long openTimeout = -1;
-	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -158,19 +153,6 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 		return this.outputDirectoryMode;
 	}
 	
-	
-	public void setOpenTimeout(long timeout) {
-		if (timeout < 0) {
-			throw new IllegalArgumentException("The timeout must be a nonnegative numer of milliseconds (zero for infinite).");
-		}
-		
-		this.openTimeout = (timeout == 0) ? Long.MAX_VALUE : timeout;
-	}
-	
-	public long getOpenTimeout() {
-		return this.openTimeout;
-	}
-	
 	// ----------------------------------------------------------------
 
 	@Override
@@ -200,34 +182,58 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 		if (this.outputDirectoryMode == null) {
 			this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
 		}
-		
-		if (this.openTimeout == -1) {
-			this.openTimeout = FileInputFormat.getDefaultOpeningTimeout();
-		}
 	}
 
 	
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
+		if (taskNumber < 0 || numTasks < 1) {
+			throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
+		}
 		
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Openint stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
-					", OutputDirectoryMode=" + outputDirectoryMode + ", timeout=" + openTimeout);
+			LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
+					", OutputDirectoryMode=" + outputDirectoryMode);
 		}
 		
-		// obtain FSDataOutputStream asynchronously, since HDFS client is vulnerable to InterruptedExceptions
-		OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks);
-		opot.start();
+		Path p = this.outputFilePath;
+		if (p == null) {
+			throw new IOException("The file path is null.");
+		}
 		
-		try {
-			// get FSDataOutputStream
-			this.stream = opot.waitForCompletion();
+		final FileSystem fs = p.getFileSystem();
+
+		// if this is a local file system, we need to initialize the local output directory here
+		if (!fs.isDistributedFS()) {
+			
+			if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
+				// output should go to a single file
+				
+				// prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
+				if(!fs.initOutPathLocalFS(p, writeMode, false)) {
+					// output preparation failed! Cancel task.
+					throw new IOException("Output path could not be initialized. Canceling task...");
+				}
+			}
+			else {
+				// numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS
+				
+				if(!fs.initOutPathLocalFS(p, writeMode, true)) {
+					// output preparation failed! Cancel task.
+					throw new IOException("Output directory could not be created. Canceling task...");
+				}
+			}
 		}
-		catch (Exception e) {
-			throw new RuntimeException("Stream to output file could not be opened: " + e.getMessage(), e);
+			
+			
+		// Suffix the path with the parallel instance index, if needed
+		if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
+			p = p.suffix("/" + (taskNumber+1));
 		}
-	}
 
+		// create output file
+		this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+	}
 
 	@Override
 	public void close() throws IOException {
@@ -238,153 +244,37 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 		}
 	}
 	
-	// ============================================================================================
-	
-	private static final class OutputPathOpenThread extends Thread {
-		
-		private final Path path;
-		
-		private final int taskIndex;
-		
-		private final int numTasks;
-		
-		private final WriteMode writeMode;
-		
-		private final OutputDirectoryMode outDirMode;
-		
-		private final long timeoutMillies;
-		
-		private volatile FSDataOutputStream fdos;
-
-		private volatile Throwable error;
-		
-		private volatile boolean aborted;
-
-		
-		public OutputPathOpenThread(FileOutputFormat<?> fof, int taskIndex, int numTasks) {
-			this.path = fof.getOutputFilePath();
-			this.writeMode = fof.getWriteMode();
-			this.outDirMode = fof.getOutputDirectoryMode();
-			this.timeoutMillies = fof.getOpenTimeout();
-			this.taskIndex = taskIndex;
-			this.numTasks = numTasks;
-		}
-
-		@Override
-		public void run() {
-
-			try {
-				Path p = this.path;
-				final FileSystem fs = p.getFileSystem();
-
-				// initialize output path. 
-				if(this.numTasks == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
-					// output is not written in parallel and should go to a single file
-					
-					if(!fs.isDistributedFS()) {
-						// prepare local output path
-						// checks for write mode and removes existing files in case of OVERWRITE mode
-						if(!fs.initOutPathLocalFS(p, writeMode, false)) {
-							// output preparation failed! Cancel task.
-							throw new IOException("Output path could not be initialized. Canceling task.");
-						}
-					}
-					
-				} else if(this.numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS) {
-					// output is written in parallel into a directory or should always be written to a directory
-					
-					if(!fs.isDistributedFS()) {
-						// File system is not distributed.
-						// We need to prepare the output path on each executing node.
-						if(!fs.initOutPathLocalFS(p, writeMode, true)) {
-							// output preparation failed! Cancel task.
-							throw new IOException("Output directory could not be created. Canceling task.");
-						}
-					}
-					
-					// Suffix the path with the parallel instance index
-					p = p.suffix("/" + this.taskIndex);
-					
-				} else {
-					// invalid number of subtasks (<= 0)
-					throw new IllegalArgumentException("Invalid number of subtasks. Canceling task.");
-				}
-					
-				// create output file
-				switch(writeMode) {
-				case NO_OVERWRITE: 
-					this.fdos = fs.create(p, false);
-					break;
-				case OVERWRITE:
-					this.fdos = fs.create(p, true);
-					break;
-				default:
-					throw new IllegalArgumentException("Invalid write mode: "+writeMode);
-				}
-				
-				// check for canceling and close the stream in that case, because no one will obtain it
-				if (this.aborted) {
-					final FSDataOutputStream f = this.fdos;
-					this.fdos = null;
-					f.close();
-				}
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-		}
+	/**
+	 * Initialization of the distributed file system if it is used.
+	 *
+	 * @param parallelism The task parallelism.
+	 */
+	@Override
+	public void initializeGlobal(int parallelism) throws IOException {
+		final Path path = getOutputFilePath();
+		final FileSystem fs = path.getFileSystem();
 		
-		public FSDataOutputStream waitForCompletion() throws Exception {
-			final long start = System.currentTimeMillis();
-			long remaining = this.timeoutMillies;
+		// only distributed file systems can be initialized at start-up time.
+		if (fs.isDistributedFS()) {
 			
-			do {
-				try {
-					this.join(remaining);
-				} catch (InterruptedException iex) {
-					// we were canceled, so abort the procedure
-					abortWait();
-					throw iex;
+			final WriteMode writeMode = getWriteMode();
+			final OutputDirectoryMode outDirMode = getOutputDirectoryMode();
+
+			if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
+				// output is not written in parallel and should be written to a single file.
+				// prepare distributed output path
+				if(!fs.initOutPathDistFS(path, writeMode, false)) {
+					// output preparation failed! Cancel task.
+					throw new IOException("Output path could not be initialized.");
 				}
-			}
-			while (this.error == null && this.fdos == null &&
-					(remaining = this.timeoutMillies + start - System.currentTimeMillis()) > 0);
-			
-			if (this.error != null) {
-				throw new IOException("Opening the file output stream failed" +
-					(this.error.getMessage() == null ? "." : ": " + this.error.getMessage()), this.error);
-			}
-			
-			if (this.fdos != null) {
-				return this.fdos;
+
 			} else {
-				// double-check that the stream has not been set by now. we don't know here whether
-				// a) the opener thread recognized the canceling and closed the stream
-				// b) the flag was set such that the stream did not see it and we have a valid stream
-				// In any case, close the stream and throw an exception.
-				abortWait();
-				
-				final boolean stillAlive = this.isAlive();
-				final StringBuilder bld = new StringBuilder(256);
-				for (StackTraceElement e : this.getStackTrace()) {
-					bld.append("\tat ").append(e.toString()).append('\n');
+				// output should be written to a directory
+
+				// only distributed file systems can be initialized at start-up time.
+				if(!fs.initOutPathDistFS(path, writeMode, true)) {
+					throw new IOException("Output directory could not be created.");
 				}
-				throw new IOException("Output opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") + 
-					" alive. Stack:\n" + bld.toString());
-			}
-		}
-		
-		/**
-		 * Double checked procedure setting the abort flag and closing the stream.
-		 */
-		private final void abortWait() {
-			this.aborted = true;
-			final FSDataOutputStream outStream = this.fdos;
-			this.fdos = null;
-			if (outStream != null) {
-				try {
-					outStream.close();
-				} catch (Throwable t) {}
 			}
 		}
 	}
@@ -437,47 +327,4 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 			super(targetConfig);
 		}
 	}
-
-	/**
-	 * Initialization of the distributed file system if it is used.
-	 *
-	 * @param configuration The task configuration
-	 */
-	@Override
-	public void initialize(Configuration configuration){
-		final Path path = this.getOutputFilePath();
-		final WriteMode writeMode = this.getWriteMode();
-		final OutputDirectoryMode outDirMode = this.getOutputDirectoryMode();
-
-		// Prepare output path and determine max DOP
-		try {
-			final FileSystem fs = path.getFileSystem();
-
-			int dop = configuration.getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
-			if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
-				// output is not written in parallel and should be written to a single file.
-
-				if(fs.isDistributedFS()) {
-					// prepare distributed output path
-					if(!fs.initOutPathDistFS(path, writeMode, false)) {
-						// output preparation failed! Cancel task.
-						throw new IOException("Output path could not be initialized.");
-					}
-				}
-			} else {
-				// output should be written to a directory
-
-				if(fs.isDistributedFS()) {
-					// only distributed file systems can be initialized at start-up time.
-					if(!fs.initOutPathDistFS(path, writeMode, true)) {
-						throw new IOException("Output directory could not be created.");
-					}
-				}
-			}
-		}
-		catch (IOException e) {
-			LOG.error("Could not access the file system to detemine the status of the output.", e);
-			throw new RuntimeException("I/O Error while accessing file", e);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
index f191c61..ec1033e 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
@@ -153,7 +153,6 @@ public class FormatUtil {
 	{
 		final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
 		outputFormat.setOutputFilePath(new Path(path));
-		outputFormat.setOpenTimeout(0);
 		outputFormat.setWriteMode(WriteMode.OVERWRITE);
 	
 		configuration = configuration == null ? new Configuration() : configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
new file mode 100644
index 0000000..86fdee2
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.io;
+
+import java.io.IOException;
+
+/**
+ * This interface may be implemented by {@link OutputFormat}s to have the master initialize them globally.
+ * 
+ * For example, the {@link FileOutputFormat} implements this behavior for distributed file systems and
+ * creates/deletes target directories if necessary.
+ */
+public interface InitializeOnMaster {
+
+	/**
+	 * The method is invoked on the master (JobManager) before the distributed program execution starts.
+	 * 
+	 * @param parallelism The degree of parallelism with which the format or functions will be run.
+	 * @throws IOException The initialization may throw exceptions, which may cause the job to abort.
+	 */
+	void initializeGlobal(int parallelism) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
index bdc59e4..72dddf4 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
@@ -20,14 +20,13 @@ import eu.stratosphere.configuration.Configuration;
 
 
 /**
- * Describes the base interface that is used describe an output that consumes records. The output format
+ * The base interface for outputs that consumes records. The output format
  * describes how to store the final records, for example in a file.
  * <p>
  * The life cycle of an output format is the following:
  * <ol>
- *   <li>After being instantiated (parameterless), it is configured with a {@link Configuration} object. 
- *       Basic fields are read from the configuration, such as for example a file path, if the format describes
- *       files as the sink for the records.</li>
+ *   <li>configure() is invoked a single time. The method can be used to implement initialization from
+ *       the parameters (configuration) that may be attached upon instantiation.</li>
  *   <li>Each parallel output task creates an instance, configures it and opens it.</li>
  *   <li>All records of its parallel instance are handed to the output format.</li>
  *   <li>The output format is closed</li>
@@ -79,13 +78,5 @@ public interface OutputFormat<IT> extends Serializable {
 	 * @throws IOException Thrown, if the input could not be closed properly.
 	 */
 	void close() throws IOException;
-
-	/**
-	 * Method which is called on the JobManager node prior to execution. It can be used to set up output format
-	 * related tasks.
-	 *
-	 * @param configuration The task configuration
-	 */
-	void initialize(Configuration configuration);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
index 451577f..6b9436b 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
@@ -34,17 +34,19 @@ import eu.stratosphere.core.io.StringRecord;
  * This class is thread-safe.
  * 
  */
-public class Configuration implements IOReadableWritable {
+public class Configuration implements IOReadableWritable, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * Stores the concrete key/value pairs of this configuration object.
 	 */
-	private Map<String, String> confData = new HashMap<String, String>();
+	private final Map<String, String> confData = new HashMap<String, String>();
 
 	/**
 	 * The class loader to be used for the <code>getClass</code> method.
 	 */
-	private ClassLoader classLoader;
+	private transient ClassLoader classLoader;
 
 	/**
 	 * Constructs a new configuration object.
@@ -446,7 +448,6 @@ public class Configuration implements IOReadableWritable {
 	
 	// --------------------------------------------------------------------------------------------
 
-
 	@Override
 	public void read(final DataInput in) throws IOException {
 
@@ -479,6 +480,13 @@ public class Configuration implements IOReadableWritable {
 			}
 		}
 	}
+	
+	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
+		s.defaultReadObject();
+		this.classLoader = getClass().getClassLoader();
+	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index 11c7007..8e65636 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -459,7 +459,7 @@ public abstract class FileSystem {
 			// path exists, check write mode
 			switch (writeMode) {
 			case NO_OVERWRITE:
-				if (status.isDir()) {
+				if (status.isDir() && createDirectory) {
 					return true;
 				} else {
 					// file may not be overwritten
@@ -467,6 +467,7 @@ public abstract class FileSystem {
 							WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + 
 							" mode to overwrite existing files and directories.");
 				}
+
 			case OVERWRITE:
 				if (status.isDir()) {
 					if (createDirectory) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java b/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
index 50c2599..de2358b 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
@@ -34,6 +34,8 @@ import java.text.CharacterIterator;
 import java.text.StringCharacterIterator;
 import java.util.Arrays;
 
+import eu.stratosphere.types.Value;
+
 /**
  * This class stores text using standard UTF8 encoding. It provides methods to
  * serialize, deserialize, and compare texts at byte level. The type of length
@@ -44,7 +46,9 @@ import java.util.Arrays;
  * Also includes utilities for serializing/deserialing a string, coding/decoding a string, checking if a byte array
  * contains valid UTF8 code, calculating the length of an encoded string.
  */
-public class StringRecord implements IOReadableWritable {
+public class StringRecord implements Value {
+	
+	private static final long serialVersionUID = 1L;
 
 	private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
 		protected CharsetEncoder initialValue() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
index 16f610a..b59e2e6 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
@@ -18,9 +18,9 @@ package eu.stratosphere.util;
 import java.util.Iterator;
 
 /**
- * An {@link Iterator] that is also {@link Iterable} (often by returning itself).
+ * An {@link Iterator} that is also {@link Iterable} (often by returning itself).
  * 
- * @param <T> The iterated elements' type.
+ * @param <E> The iterated elements' type.
  */
 public interface IterableIterator<E> extends Iterator<E>, Iterable<E> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
index d1736d4..5c09439 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
@@ -95,7 +95,4 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
-
-	@Override
-	public void initialize(Configuration configuration){}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 2416b07..ae5198a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -43,7 +43,6 @@ import org.apache.commons.logging.LogFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index 1c4a820..18395fb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -31,11 +31,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import eu.stratosphere.api.common.io.InitializeOnMaster;
+import eu.stratosphere.api.common.io.OutputFormat;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.IllegalConfigurationException;
 import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.execution.ExecutionListener;
 import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
@@ -45,11 +47,11 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
 import eu.stratosphere.nephele.jobgraph.JobEdge;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.util.StringUtils;
 
@@ -462,42 +464,68 @@ public class ExecutionGraph implements ExecutionListener {
 					: false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature,
 				invokableClass);
 		} catch (Throwable t) {
-			throw new GraphConversionException(StringUtils.stringifyException(t));
+			throw new GraphConversionException(t);
 		}
 
 		// Register input and output vertices separately
 		if (jobVertex instanceof AbstractJobInputVertex) {
 
-			final InputSplit[] inputSplits;
-
+			final AbstractJobInputVertex jobInputVertex = (AbstractJobInputVertex) jobVertex;
+			
+			if (jobVertex instanceof JobInputVertex) {
+				try {
+					// get a handle to the user code class loader
+					ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+					
+					((JobInputVertex) jobVertex).initializeInputFormatFromTaskConfig(cl);
+				}
+				catch (Throwable t) {
+					throw new GraphConversionException("Could not deserialize input format.", t);
+				}
+			}
+			
 			final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
+			
+			InputSplit[] inputSplits;
 
-			try{
+			try {
 				inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
-			}catch(Exception e) {
-				throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName() + ": "
-						+ StringUtils.stringifyException(e));
+			}
+			catch (Throwable t) {
+				throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), t);
 			}
 
 			if (inputSplits == null) {
-				LOG.info("Job input vertex " + jobVertex.getName() + " generated 0 input splits");
-			} else {
-				LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length
-					+ " input splits");
+				inputSplits = new InputSplit[0];
 			}
+			
+			LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length + " input splits");
 
 			// assign input splits and type
 			groupVertex.setInputSplits(inputSplits);
 			groupVertex.setInputSplitType(inputSplitType);
 		}
 
-		if(jobVertex instanceof JobOutputVertex){
+		if (jobVertex instanceof JobOutputVertex){
 			final JobOutputVertex jobOutputVertex = (JobOutputVertex) jobVertex;
+			
+			try {
+				// get a handle to the user code class loader
+				ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+				jobOutputVertex.initializeOutputFormatFromTaskConfig(cl);
+			}
+			catch (Throwable t) {
+				throw new GraphConversionException("Could not deserialize output format.", t);
+			}
 
-			final OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
-
-			if(outputFormat != null){
-				outputFormat.initialize(groupVertex.getConfiguration());
+			OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
+			if (outputFormat != null && outputFormat instanceof InitializeOnMaster){
+				try {
+					((InitializeOnMaster) outputFormat).initializeGlobal(jobVertex.getNumberOfSubtasks());
+				}
+				catch (Throwable t) {
+					throw new GraphConversionException(t);
+				}
 			}
 		}
 
@@ -519,7 +547,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the number of input vertices registered with this execution graph
 	 */
 	public int getNumberOfInputVertices() {
-
 		return this.stages.get(0).getNumberOfInputExecutionVertices();
 	}
 
@@ -531,7 +558,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the number of input vertices for the given stage
 	 */
 	public int getNumberOfInputVertices(int stage) {
-
 		if (stage >= this.stages.size()) {
 			return 0;
 		}
@@ -545,7 +571,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the number of output vertices registered with this execution graph
 	 */
 	public int getNumberOfOutputVertices() {
-
 		return this.stages.get(0).getNumberOfOutputExecutionVertices();
 	}
 
@@ -557,7 +582,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the number of input vertices for the given stage
 	 */
 	public int getNumberOfOutputVertices(final int stage) {
-
 		if (stage >= this.stages.size()) {
 			return 0;
 		}
@@ -574,7 +598,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         exists
 	 */
 	public ExecutionVertex getInputVertex(final int index) {
-
 		return this.stages.get(0).getInputExecutionVertex(index);
 	}
 
@@ -587,7 +610,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         exists
 	 */
 	public ExecutionVertex getOutputVertex(final int index) {
-
 		return this.stages.get(0).getOutputExecutionVertex(index);
 	}
 
@@ -602,7 +624,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         exists in that stage
 	 */
 	public ExecutionVertex getInputVertex(final int stage, final int index) {
-
 		try {
 			final ExecutionStage s = this.stages.get(stage);
 			if (s == null) {
@@ -627,7 +648,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         exists in that stage
 	 */
 	public ExecutionVertex getOutputVertex(final int stage, final int index) {
-
 		try {
 			final ExecutionStage s = this.stages.get(stage);
 			if (s == null) {
@@ -649,7 +669,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the execution stage with number <code>num</code> or <code>null</code> if no such execution stage exists
 	 */
 	public ExecutionStage getStage(final int num) {
-
 		try {
 			return this.stages.get(num);
 		} catch (ArrayIndexOutOfBoundsException e) {
@@ -663,7 +682,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the number of execution stages in the execution graph
 	 */
 	public int getNumberOfStages() {
-
 		return this.stages.size();
 	}
 
@@ -676,7 +694,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         exists in the execution graph
 	 */
 	public ExecutionVertex getVertexByChannelID(final ChannelID id) {
-
 		final ExecutionEdge edge = this.edgeMap.get(id);
 		if (edge == null) {
 			return null;
@@ -697,7 +714,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return the edge whose ID matches <code>id</code> or <code>null</code> if no such edge is known
 	 */
 	public ExecutionEdge getEdgeByID(final ChannelID id) {
-
 		return this.edgeMap.get(id);
 	}
 
@@ -708,7 +724,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *        the execution vertex to register
 	 */
 	void registerExecutionVertex(final ExecutionVertex vertex) {
-
 		if (this.vertexMap.put(vertex.getID(), vertex) != null) {
 			throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID()
 				+ " registered");
@@ -724,7 +739,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *         found
 	 */
 	public ExecutionVertex getVertexByID(final ExecutionVertexID id) {
-
 		return this.vertexMap.get(id);
 	}
 
@@ -735,7 +749,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return <code>true</code> if stage is completed, <code>false</code> otherwise
 	 */
 	private boolean isCurrentStageCompleted() {
-
 		if (this.indexToCurrentExecutionStage >= this.stages.size()) {
 			return true;
 		}
@@ -758,7 +771,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @return <code>true</code> if the execution of the graph is finished, <code>false</code> otherwise
 	 */
 	public boolean isExecutionFinished() {
-
 		return (getJobStatus() == InternalJobStatus.FINISHED);
 	}
 
@@ -1307,4 +1319,26 @@ public class ExecutionGraph implements ExecutionListener {
 			}
 		}
 	}
+	
+	/**
+	 * Retrieves the number of required slots to run this execution graph
+	 * @return
+	 */
+	public int getRequiredSlots(){
+		int maxRequiredSlots = 0;
+
+		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
+
+		while(stageIterator.hasNext()){
+			final ExecutionStage stage = stageIterator.next();
+
+			int requiredSlots = stage.getRequiredSlots();
+
+			if(requiredSlots > maxRequiredSlots){
+				maxRequiredSlots = requiredSlots;
+			}
+		}
+
+		return maxRequiredSlots;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index 91e9e53..dceeb90 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -15,7 +15,6 @@ package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.jobgraph.JobVertexID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
index e4d3b9d..b901742 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
@@ -15,11 +15,8 @@ package eu.stratosphere.nephele.jobgraph;
 
 import eu.stratosphere.core.io.InputSplit;
 
-import java.io.IOException;
-
 /**
- * An abstract base class for input vertices in Nephele.
- * 
+ * An abstract base class for input vertices.
  */
 public abstract class AbstractJobInputVertex extends AbstractJobVertex {
 
@@ -28,12 +25,24 @@ public abstract class AbstractJobInputVertex extends AbstractJobVertex {
 	 * 
 	 * @param name
 	 *        the name of the new job input vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	protected AbstractJobInputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	/**
+	 * Constructs a new job input vertex with the given name.
+	 * 
+	 * @param name
+	 *        the name of the new job input vertex
 	 * @param id
 	 *        the ID of this vertex
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	protected AbstractJobInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+	protected AbstractJobInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
 		super(name, id, jobGraph);
 
 		jobGraph.addVertex(this);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
index 849df4b..6020f24 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
@@ -24,14 +24,15 @@ public abstract class AbstractJobOutputVertex extends AbstractJobVertex {
 	 * 
 	 * @param name
 	 *        the name of the new job output vertex
-	 * @param id
-	 *        the ID of this vertex
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	protected AbstractJobOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+	protected AbstractJobOutputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	protected AbstractJobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
 		super(name, id, jobGraph);
-
 		jobGraph.addVertex(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index cdadd3c..cc936d9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -18,8 +18,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.commons.lang.Validate;
+
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.IllegalConfigurationException;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
@@ -29,8 +30,7 @@ import eu.stratosphere.nephele.util.EnumUtils;
 import eu.stratosphere.util.StringUtils;
 
 /**
- * An abstract base class for a job vertex in Nephele.
- * 
+ * An abstract base class for a job vertex.
  */
 public abstract class AbstractJobVertex implements IOReadableWritable {
 
@@ -86,19 +86,30 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 */
 	protected Class<? extends AbstractInvokable> invokableClass = null;
 
+	
 	/**
 	 * Constructs a new job vertex and assigns it with the given name.
 	 * 
 	 * @param name
 	 *        the name of the new job vertex
-	 * @param id
-	 *        the ID of this vertex
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	protected AbstractJobVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+	protected AbstractJobVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	/**
+	 * Constructs a new job vertex and assigns it with the given name.
+	 * 
+	 * @param name
+	 *        the name of the new job vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	protected AbstractJobVertex(String name, JobVertexID id, JobGraph jobGraph) {
 		this.name = name == null ? DEFAULT_NAME : name;
-		this.id = (id == null) ? new JobVertexID() : id;
+		this.id = id == null ? new JobVertexID() : id;
 		this.jobGraph = jobGraph;
 	}
 
@@ -572,13 +583,17 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 		return this.configuration;
 	}
 
+	public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
+		Validate.notNull(invokable);
+		this.invokableClass = invokable;
+	}
+	
 	/**
 	 * Returns the invokable class which represents the task of this vertex
 	 * 
 	 * @return the invokable class, <code>null</code> if it is not set
 	 */
 	public Class<? extends AbstractInvokable> getInvokableClass() {
-
 		return this.invokableClass;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
index f048b0d..3d14d0a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
@@ -26,8 +26,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Stack;
-import java.util.Vector;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.FSDataInputStream;
@@ -77,11 +75,6 @@ public class JobGraph implements IOReadableWritable {
 	private Configuration jobConfiguration = new Configuration();
 
 	/**
-	 * The configuration which should be applied to the task managers involved in processing this job.
-	 */
-	private final Configuration taskManagerConfiguration = new Configuration();
-
-	/**
 	 * List of JAR files required to run this job.
 	 */
 	private final ArrayList<Path> userJars = new ArrayList<Path>();
@@ -134,24 +127,12 @@ public class JobGraph implements IOReadableWritable {
 	}
 
 	/**
-	 * Returns the configuration object distributed among the task managers
-	 * before they start processing this job.
-	 * 
-	 * @return the configuration object for the task managers, or <code>null</code> if it is not set
-	 */
-	public Configuration getTaskmanagerConfiguration() {
-
-		return this.taskManagerConfiguration;
-	}
-
-	/**
 	 * Adds a new input vertex to the job graph if it is not already included.
 	 * 
 	 * @param inputVertex
 	 *        the new input vertex to be added
 	 */
-	public void addVertex(final AbstractJobInputVertex inputVertex) {
-
+	public void addVertex(AbstractJobInputVertex inputVertex) {
 		if (!inputVertices.containsKey(inputVertex.getID())) {
 			inputVertices.put(inputVertex.getID(), inputVertex);
 		}
@@ -163,8 +144,7 @@ public class JobGraph implements IOReadableWritable {
 	 * @param taskVertex
 	 *        the new task vertex to be added
 	 */
-	public void addVertex(final JobTaskVertex taskVertex) {
-
+	public void addVertex(JobTaskVertex taskVertex) {
 		if (!taskVertices.containsKey(taskVertex.getID())) {
 			taskVertices.put(taskVertex.getID(), taskVertex);
 		}
@@ -176,8 +156,7 @@ public class JobGraph implements IOReadableWritable {
 	 * @param outputVertex
 	 *        the new output vertex to be added
 	 */
-	public void addVertex(final AbstractJobOutputVertex outputVertex) {
-
+	public void addVertex(AbstractJobOutputVertex outputVertex) {
 		if (!outputVertices.containsKey(outputVertex.getID())) {
 			outputVertices.put(outputVertex.getID(), outputVertex);
 		}
@@ -570,9 +549,6 @@ public class JobGraph implements IOReadableWritable {
 		// Re-instantiate the job configuration object and read the configuration
 		this.jobConfiguration = new Configuration(cl);
 		this.jobConfiguration.read(in);
-
-		// Read the task manager configuration
-		this.taskManagerConfiguration.read(in);
 	}
 
 
@@ -610,7 +586,6 @@ public class JobGraph implements IOReadableWritable {
 
 		// Write out configuration objects
 		this.jobConfiguration.write(out);
-		this.taskManagerConfiguration.write(out);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
index 29f98d9..bf8f544 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
@@ -13,41 +13,21 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
+import java.io.IOException;
+
 import eu.stratosphere.api.common.io.InputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
 import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
-import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.pact.runtime.task.util.TaskConfig;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 public class JobInputVertex extends AbstractJobInputVertex {
-	/**
-	 * Input format associated to this JobInputVertex. It is either directly set or reconstructed from the task
-	 * configuration. Every job input vertex requires an input format to compute the input splits and the input split
-	 * type.
-	 */
-	private volatile InputFormat<?, ? extends InputSplit> inputFormat = null;
 
-	/**
-	 * Creates a new job input vertex with the specified name.
-	 * 
-	 * @param name
-	 *        The name of the new job file input vertex.
-	 * @param id
-	 *        The ID of this vertex.
-	 * @param jobGraph
-	 *        The job graph this vertex belongs to.
-	 */
-	public JobInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+	private InputFormat<?, ?> inputFormat;
+	
+	public JobInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
 		super(name, id, jobGraph);
 	}
-
+	
 	/**
 	 * Creates a new job file input vertex with the specified name.
 	 * 
@@ -56,8 +36,8 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	 * @param jobGraph
 	 *        The job graph this vertex belongs to.
 	 */
-	public JobInputVertex(final String name, final JobGraph jobGraph) {
-		super(name, null, jobGraph);
+	public JobInputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
 	}
 
 	/**
@@ -66,112 +46,23 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	 * @param jobGraph
 	 *        The job graph this vertex belongs to.
 	 */
-	public JobInputVertex(final JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the class of the vertex's input task.
-	 *
-	 * @param inputClass
-	 *        The class of the vertex's input task.
-	 */
-	public void setInputClass(final Class<? extends AbstractInputTask<?>> inputClass) {
-		this.invokableClass = inputClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's input task.
-	 * 
-	 * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
-	 */
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractInputTask<?>> getInputClass() {
-		return (Class<? extends AbstractInputTask<?>>) this.invokableClass;
-	}
-
-	/**
-	 * Sets the input format and writes it to the task configuration. It extracts it from the UserCodeWrapper.
-	 *
-	 * @param inputFormatWrapper Wrapped input format
-	 */
-	public void setInputFormat(UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> inputFormatWrapper) {
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubWrapper(inputFormatWrapper);
-
-		inputFormat = inputFormatWrapper.getUserCodeObject();
-	}
-
-	/**
-	 * Sets the input format and writes it to the task configuration.
-	 *
-	 * @param inputFormat Input format
-	 */
-	public void setInputFormat(InputFormat<?, ? extends InputSplit> inputFormat) {
-		this.inputFormat = inputFormat;
-
-		UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> wrapper = new
-				UserCodeObjectWrapper<InputFormat<?, ? extends InputSplit>>(inputFormat);
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubWrapper(wrapper);
+	public JobInputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
 	}
-
-	/**
-	 * Sets the input format parameters.
-	 *
-	 * @param inputFormatParameters Input format parameters
-	 */
-	public void setInputFormatParameters(Configuration inputFormatParameters){
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubParameters(inputFormatParameters);
-
-		if(inputFormat == null){
-			throw new RuntimeException("There is no input format set in job vertex: " + this.getID());
-		}
-
-		inputFormat.configure(inputFormatParameters);
+	
+	public void setInputFormat(InputFormat<?, ?> format) {
+		this.inputFormat = format;
 	}
-
-	/**
-	 * Sets the output serializer for the task associated to this vertex.
-	 *
-	 * @param factory Type serializer factory
-	 */
-	public void setOutputSerializer(TypeSerializerFactory<?> factory){
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setOutputSerializer(factory);
-	}
-
-	/**
-	 * Deserializes the input format from the deserialized task configuration. It then configures the input format by
-	 * calling the configure method with the current configuration.
-	 *
-	 * @param input
-	 * @throws IOException
-	 */
-	@Override
-	public void read(final DataInput input) throws IOException{
-		super.read(input);
-
-		// load input format wrapper from the config
-		ClassLoader cl = null;
-
-		try{
-			cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+	
+	public void initializeInputFormatFromTaskConfig(ClassLoader cl) {
+		TaskConfig cfg = new TaskConfig(getConfiguration());
+		
+		UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(cl);
+		
+		if (wrapper != null) {
+			this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl);
+			this.inputFormat.configure(cfg.getStubParameters());
 		}
-		catch (IOException ioe) {
-			throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
-					this.getJobGraph().getJobID(), ioe);
-		}
-
-		final Configuration config = this.getConfiguration();
-		config.setClassLoader(cl);
-		final TaskConfig taskConfig = new TaskConfig(config);
-
-		inputFormat = taskConfig.<InputFormat<?, InputSplit>>getStubWrapper(cl).getUserCodeObject(InputFormat.class,
-				cl);
-
-		inputFormat.configure(taskConfig.getStubParameters());
 	}
 
 	/**
@@ -197,7 +88,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	 */
 	@Override
 	public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
-		if(inputFormat == null){
+		if (inputFormat == null){
 			throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
index cf937a0..abe6be9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
@@ -14,41 +14,20 @@
 package eu.stratosphere.nephele.jobgraph;
 
 import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
 import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.pact.runtime.task.util.TaskConfig;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 /**
- * A JobOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
+ * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
  * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
  * a JobOutputVertex must not have any further output.
- * 
  */
 public class JobOutputVertex extends AbstractJobOutputVertex {
 	/**
 	 * Contains the output format associated to this output vertex. It can be <pre>null</pre>.
 	 */
-	private volatile OutputFormat<?> outputFormat = null;
+	private OutputFormat<?> outputFormat;
 
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param id
-	 *        the ID of this vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
 
 	/**
 	 * Creates a new job file output vertex with the specified name.
@@ -58,8 +37,12 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	public JobOutputVertex(final String name, final JobGraph jobGraph) {
-		super(name, null, jobGraph);
+	public JobOutputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	public JobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
 	}
 
 	/**
@@ -68,94 +51,21 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	public JobOutputVertex(final JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the class of the vertex's output task.
-	 * 
-	 * @param outputClass
-	 *        The class of the vertex's output task.
-	 */
-	public void setOutputClass(final Class<? extends AbstractOutputTask> outputClass) {
-		this.invokableClass = outputClass;
+	public JobOutputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
 	}
-
-	/**
-	 * Returns the class of the vertex's output task.
-	 * 
-	 * @return The class of the vertex's output task or <code>null</code> if no task has yet been set.
-	 */
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractOutputTask> getOutputClass() {
-		return (Class<? extends AbstractOutputTask>) this.invokableClass;
+	
+	public void setOutputFormat(OutputFormat<?> format) {
+		this.outputFormat = format;
 	}
-
-	/**
-	 * Sets the output format and writes it to the task configuration.
-	 *
-	 * @param outputFormatWrapper Wrapped output format
-	 */
-	public void setOutputFormat(UserCodeWrapper<? extends OutputFormat<?>> outputFormatWrapper){
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubWrapper(outputFormatWrapper);
-		outputFormat = outputFormatWrapper.getUserCodeObject();
-	}
-
-	/**
-	 * Sets the output format and writes it to the task configuration.
-	 *
-	 * @param outputFormat Output format
-	 */
-	public void setOutputFormat(OutputFormat<?> outputFormat){
-		this.outputFormat = outputFormat;
-		UserCodeWrapper<? extends OutputFormat<?>> wrapper = new UserCodeObjectWrapper<OutputFormat<?>>
-				(outputFormat);
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubWrapper(wrapper);
-	}
-
-	/**
-	 * Sets the output format parameters for the output format by writing it to the task configuration.
-	 *
-	 * @param parameters Output format parameters
-	 */
-	public void setOutputFormatParameters(Configuration parameters){
-		TaskConfig config = new TaskConfig(this.getConfiguration());
-		config.setStubParameters(parameters);
-
-		outputFormat.configure(parameters);
-	}
-
-	/**
-	 * Deserializes the output format from the deserialized configuration if it contains an output format. The output
-	 * format is always stored in the stub wrapper. If the task configuration contains an output format,
-	 * then it is configured after deserialization.
-	 *
-	 * @param input
-	 * @throws IOException
-	 */
-	@Override
-	public void read(final DataInput input) throws IOException{
-		super.read(input);
-
-		ClassLoader cl = null;
-		try{
-			cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
-		}
-		catch (IOException ioe) {
-			throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
-					this.getJobGraph().getJobID(), ioe);
-		}
-
-		final Configuration config = this.getConfiguration();
-		config.setClassLoader(cl);
-		final TaskConfig taskConfig = new TaskConfig(config);
-
-		if(taskConfig.hasStubWrapper()){
-			outputFormat = taskConfig.<OutputFormat<?> >getStubWrapper(cl).getUserCodeObject(OutputFormat.class,cl);
-			outputFormat.configure(taskConfig.getStubParameters());
+	
+	public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
+		TaskConfig cfg = new TaskConfig(getConfiguration());
+		UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
+		
+		if (wrapper != null) {
+			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
+			this.outputFormat.configure(cfg.getStubParameters());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
index 8672aeb..d16286c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import eu.stratosphere.nephele.template.AbstractTask;
-
 /**
  * A JobTaskVertex is the vertex type for regular tasks (with both input and output) in Nephele.
  * Tasks running inside a JobTaskVertex must specify at least one record reader and one record writer.
@@ -27,28 +25,15 @@ public class JobTaskVertex extends AbstractJobVertex {
 	 * 
 	 * @param name
 	 *        the name for the new job task vertex
-	 * @param id
-	 *        the ID of this vertex
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	public JobTaskVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
-		super(name, id, jobGraph);
-
-		jobGraph.addVertex(this);
+	public JobTaskVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
 	}
-
-	/**
-	 * Creates a new job task vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name for the new job task vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobTaskVertex(final String name, final JobGraph jobGraph) {
-		super(name, null, jobGraph);
-
+	
+	public JobTaskVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
 		jobGraph.addVertex(this);
 	}
 
@@ -58,29 +43,7 @@ public class JobTaskVertex extends AbstractJobVertex {
 	 * @param jobGraph
 	 *        the job graph this vertex belongs to
 	 */
-	public JobTaskVertex(final JobGraph jobGraph) {
-		super(null, null, jobGraph);
-
-		jobGraph.addVertex(this);
-	}
-
-	/**
-	 * Sets the class of the vertex's task.
-	 * 
-	 * @param taskClass
-	 *        the class of the vertex's task
-	 */
-	public void setTaskClass(final Class<? extends AbstractTask> taskClass) {
-		this.invokableClass = taskClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's task.
-	 * 
-	 * @return the class of the vertex's task or <code>null</code> if the class has not yet been set
-	 */
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractTask> getTaskClass() {
-		return (Class<? extends AbstractTask>) this.invokableClass;
+	public JobTaskVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 8a3cba4..f3cf3a3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -393,9 +393,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		try {
 			// First check if job is null
 			if (job == null) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
-					"Submitted job is null!");
-				return result;
+				return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
 			}
 	
 			if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index 790aca9..da63bf2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -31,8 +31,6 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobmanager.splitassigner.file.FileInputSplitAssigner;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.util.StringUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 1e6929d..dc52911 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -24,8 +24,6 @@ import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.core.io.LocatableInputSplit;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
 
 /**
  * The locatable input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 048562c..3580fda 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -25,8 +25,6 @@ import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
 import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
 
 /**
  * The file input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
@@ -89,14 +87,11 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
 		}
 	}
 
-
 	@Override
 	public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-		
 		this.vertexMap.remove(groupVertex);
 	}
 
-
 	@Override
 	public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
 


Mime
View raw message