flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/22] Merge fix to omit input/output registering on JobManager Rework Invokable Task Hierarchy
Date Sun, 22 Jun 2014 21:47:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index f191df3..575454f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -173,7 +173,7 @@ public class TaskManager implements TaskOperationProtocol {
 		}
 		
 		
-		LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
+//		LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
 		LOG.info("User system property: " + System.getProperty("user.name"));
 		LOG.info("Execution mode: " + executionMode);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
deleted file mode 100644
index 88e4fcb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import eu.stratosphere.core.io.InputSplit;
-
-/**
- * Abstract base class for tasks submitted as a part of a job input vertex.
- * 
- * @param <T>
- *        the type of input splits generated by this input task
- */
-public abstract class AbstractInputTask<T extends InputSplit> extends AbstractInvokable {
-
-	/**
-	 * Returns an iterator to a (possible empty) list of input splits which is expected to be consumed by this
-	 * instance of the {@link AbstractInputTask}.
-	 * 
-	 * @return an iterator to a (possible empty) list of input splits.
-	 */
-	public Iterator<T> getInputSplits() {
-
-		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
-		return new Iterator<T>() {
-
-			private T nextSplit;
-
-			@Override
-			public boolean hasNext() {
-
-				if (this.nextSplit == null) {
-
-					final InputSplit split = provider.getNextInputSplit();
-					if (split != null) {
-						@SuppressWarnings("unchecked")
-						final T tSplit = (T) split;
-						this.nextSplit = tSplit;
-						return true;
-					} else {
-						return false;
-					}
-				} else {
-					return true;
-				}
-			}
-
-			@Override
-			public T next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final T tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
index 79390f8..792c1bf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
@@ -14,7 +14,6 @@
 package eu.stratosphere.nephele.template;
 
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.IllegalConfigurationException;
 import eu.stratosphere.nephele.execution.Environment;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractOutputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractOutputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractOutputTask.java
deleted file mode 100644
index 13042d4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractOutputTask.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-/**
- * Abstract base class for tasks submitted as a part of a job output vertex.
- * 
- */
-public abstract class AbstractOutputTask extends AbstractInvokable {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractTask.java
deleted file mode 100644
index 6d568ab..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractTask.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-/**
- * Abstract base class for tasks submitted as a part of a job task vertex.
- * 
- */
-public abstract class AbstractTask extends AbstractInvokable {
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
index ced186b..f2944f4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
@@ -13,14 +13,14 @@
 
 package eu.stratosphere.pact.runtime.iterative.io;
 
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.MutableRecordReader;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.types.Record;
 
 /**
  * Output task for the iteration tail
  */
-public class FakeOutputTask extends AbstractOutputTask {
+public class FakeOutputTask extends AbstractInvokable {
 
 	private MutableRecordReader<Record> reader;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 4e7286b..947872f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -29,7 +29,7 @@ import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
 import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.runtime.io.api.MutableRecordReader;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.types.IntegerRecord;
 import eu.stratosphere.pact.runtime.iterative.event.AllWorkersDoneEvent;
 import eu.stratosphere.pact.runtime.iterative.event.TerminationEvent;
@@ -44,7 +44,7 @@ import eu.stratosphere.types.Value;
  * In each superstep, it simply waits until it has receiced a {@link WorkerDoneEvent} from each head and will send back
  * an {@link AllWorkersDoneEvent} to signal that the next superstep can begin.
  */
-public class IterationSynchronizationSinkTask extends AbstractOutputTask implements Terminable {
+public class IterationSynchronizationSinkTask extends AbstractInvokable implements Terminable {
 
 	private static final Log log = LogFactory.getLog(IterationSynchronizationSinkTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
index 859a62d..05b58e8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
@@ -97,7 +97,13 @@ public class IterationTailPactTask<S extends Function, OT> extends AbstractItera
 				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
 			}
 
-			super.run();
+			try {
+				super.run();
+			}
+			catch (NullPointerException e) {
+				boolean terminationRequested = terminationRequested();
+				System.out.println("Nullpoint exception when termination requested was " + terminationRequested);
+			}
 
 			// check if termination was requested
 			checkForTerminationAndResetEndOfSuperstepState();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index cbe1766..7041679 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -16,26 +16,22 @@ package eu.stratosphere.pact.runtime.task;
 import java.io.IOException;
 
 import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import eu.stratosphere.api.common.io.FileOutputFormat;
-import eu.stratosphere.api.common.io.FileOutputFormat.OutputDirectoryMode;
 import eu.stratosphere.api.common.io.OutputFormat;
 import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.FileSystem.WriteMode;
-import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.MutableReader;
 import eu.stratosphere.runtime.io.api.MutableRecordReader;
 import eu.stratosphere.runtime.io.api.MutableUnionRecordReader;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
 import eu.stratosphere.pact.runtime.sort.UnilateralSortMerger;
 import eu.stratosphere.pact.runtime.task.util.CloseableInputProvider;
@@ -51,7 +47,7 @@ import eu.stratosphere.util.MutableObjectIterator;
  * 
  * @see OutputFormat
  */
-public class DataSinkTask<IT> extends AbstractOutputTask {
+public class DataSinkTask<IT> extends AbstractInvokable {
 	
 	public static final String DEGREE_OF_PARALLELISM_KEY = "sink.dop";
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
index f835ace..62226d9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
@@ -18,9 +18,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
 import eu.stratosphere.runtime.io.api.BufferWriter;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -32,7 +34,8 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
 import eu.stratosphere.pact.runtime.shipping.OutputCollector;
 import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
 import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
@@ -47,11 +50,11 @@ import eu.stratosphere.util.Collector;
  * 
  * @see eu.stratosphere.api.common.io.InputFormat
  */
-public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
+public class DataSourceTask<OT> extends AbstractInvokable {
 	
-	// Obtain DataSourceTask Logger
 	private static final Log LOG = LogFactory.getLog(DataSourceTask.class);
 
+	
 	private List<BufferWriter> eventualOutputs;
 
 	// Output collector
@@ -76,11 +79,10 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 
 
 	@Override
-	public void registerInputOutput()
-	{
+	public void registerInputOutput() {
 		initInputFormat();
 
-		if (LOG.isDebugEnabled())
+		if (LOG.isDebugEnabled()) {
 			LOG.debug(getLogString("Start registering input and output"));
 		}
 
@@ -331,7 +333,7 @@ l	 *
 		}
 		
 		// get the factory for the type serializer
-		this.serializerFactory = this.config.getOutputSerializer(cl);
+		this.serializerFactory = this.config.getOutputSerializer(this.userCodeClassLoader);
 	}
 
 	/**
@@ -343,49 +345,6 @@ l	 *
 		this.eventualOutputs = new ArrayList<BufferWriter>();
 		this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs);
 	}
-	
-	// ------------------------------------------------------------------------
-	//                              Input Split creation
-	// ------------------------------------------------------------------------
-	
-
-	@Override
-	public InputSplit[] computeInputSplits(int requestedMinNumber) throws Exception {
-		// we have to be sure that the format is instantiated at this point
-		if (this.format == null) {
-			throw new IllegalStateException("BUG: Input format hast not been instantiated, yet.");
-		}
-		return this.format.createInputSplits(requestedMinNumber);
-	}
-
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<InputSplit> getInputSplitType() {
-		// we have to be sure that the format is instantiated at this point
-		if (this.format == null) {
-			throw new IllegalStateException("BUG: Input format hast not been instantiated, yet.");
-		}
-		
-		return (Class<InputSplit>) this.format.getInputSplitType();
-	}
-	
-	// ------------------------------------------------------------------------
-	//                       Control of Parallelism
-	// ------------------------------------------------------------------------
-	
-
-	@Override
-	public int getMinimumNumberOfSubtasks() {
-		return 1;
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks() {
-		// since splits can in theory be arbitrarily small, we report a possible infinite number of subtasks.
-		return -1;
-	}
 
 	// ------------------------------------------------------------------------
 	//                               Utilities
@@ -413,4 +372,54 @@ l	 *
 	private String getLogString(String message, String taskName) {
 		return RegularPactTask.constructLogString(message, taskName, this);
 	}
+	
+	private Iterator<InputSplit> getInputSplits() {
+
+		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+		return new Iterator<InputSplit>() {
+
+			private InputSplit nextSplit;
+			
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+				
+				if (nextSplit != null) {
+					return true;
+				}
+				
+				InputSplit split = provider.getNextInputSplit();
+				
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				}
+				else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public InputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final InputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
index 1d7c931..3140525 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
@@ -36,9 +36,7 @@ import eu.stratosphere.runtime.io.api.BufferWriter;
 import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
 import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
 import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
 import eu.stratosphere.pact.runtime.resettable.SpillingResettableMutableObjectIterator;
@@ -73,7 +71,7 @@ import java.util.Map;
  * The abstract base class for all tasks. Encapsulated common behavior and implements the main life-cycle
  * of the user code.
  */
-public class RegularPactTask<S extends Function, OT> extends AbstractTask implements PactTaskContext<S, OT> {
+public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
 
 	protected static final Log LOG = LogFactory.getLog(RegularPactTask.class);
 
@@ -1251,11 +1249,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					oe = new RecordOutputEmitter(strategy, comparator, distribution);
 				}
 
-				if (task instanceof AbstractTask) {
-					writers.add(new RecordWriter<Record>((AbstractTask) task, oe));
-				} else if (task instanceof AbstractInputTask<?>) {
-					writers.add(new RecordWriter<Record>((AbstractInputTask<?>) task, oe));
-				}
+				writers.add(new RecordWriter<Record>(task, oe));
 			}
 			if (eventualOutputs != null) {
 				eventualOutputs.addAll(writers);
@@ -1288,11 +1282,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					oe = new OutputEmitter<T>(strategy, comparator, dataDist);
 				}
 
-				if (task instanceof AbstractTask) {
-					writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractTask) task, oe));
-				} else if (task instanceof AbstractInputTask<?>) {
-					writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractInputTask<?>) task, oe));
-				}
+				writers.add(new RecordWriter<SerializationDelegate<T>>(task, oe));
 			}
 			if (eventualOutputs != null) {
 				eventualOutputs.addAll(writers);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
index 2eb003d..b44a489 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
@@ -1102,8 +1102,10 @@ public class TaskConfig {
 	/**
 	 * A configuration that manages a subset of keys with a common prefix from a given configuration.
 	 */
-	public static final class DelegatingConfiguration extends Configuration
-	{
+	public static final class DelegatingConfiguration extends Configuration {
+		
+		private static final long serialVersionUID = 1L;
+
 		private final Configuration backingConfig;		// the configuration actually storing the data
 		
 		private String prefix;							// the prefix key by which keys for this config are marked

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
index 9d03c7f..c54b542 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
@@ -17,8 +17,7 @@ import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.runtime.io.gates.InputChannelResult;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 
 public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> {
 	
@@ -30,42 +29,9 @@ public class MutableRecordReader<T extends IOReadableWritable> extends AbstractS
 	 * 
 	 * @param taskBase The application that instantiated the record reader.
 	 */
-	public MutableRecordReader(final AbstractTask taskBase) {
+	public MutableRecordReader(AbstractInvokable taskBase) {
 		super(taskBase);
 	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase The application that instantiated the record reader.
-	 */
-	public MutableRecordReader(final AbstractOutputTask outputBase) {
-		super(outputBase);
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        the application that instantiated the record reader
-	 * @param inputGateID
-	 *        The ID of the input gate that the reader reads from.
-	 */
-	public MutableRecordReader(final AbstractTask taskBase, final int inputGateID) {
-		super(taskBase);
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase
-	 *        the application that instantiated the record reader
-	 * @param inputGateID
-	 *        The ID of the input gate that the reader reads from.
-	 */
-	public MutableRecordReader(final AbstractOutputTask outputBase, final int inputGateID) {
-		super(outputBase);
-	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
index bb6a580..5fc436c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
@@ -14,8 +14,7 @@
 package eu.stratosphere.runtime.io.api;
 
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.gates.InputChannelResult;
 
 import java.io.IOException;
@@ -50,23 +49,10 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGa
 	 * @param recordType
 	 *        The class of records that can be read from the record reader.
 	 */
-	public RecordReader(AbstractTask taskBase, Class<T> recordType) {
+	public RecordReader(AbstractInvokable taskBase, Class<T> recordType) {
 		super(taskBase);
 		this.recordType = recordType;
 	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase
-	 *        The application that instantiated the record reader.
-	 * @param recordType
-	 *        The class of records that can be read from the record reader.
-	 */
-	public RecordReader(AbstractOutputTask outputBase, Class<T> recordType) {
-		super(outputBase);
-		this.recordType = recordType;
-	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
index 132dc14..a1ff62d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
@@ -15,9 +15,7 @@ package eu.stratosphere.runtime.io.api;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.channels.EndOfSuperstepEvent;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
@@ -47,25 +45,11 @@ public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
 
 	// -----------------------------------------------------------------------------------------------------------------
 
-	public RecordWriter(AbstractTask task) {
-		this((AbstractInvokable) task, new RoundRobinChannelSelector<T>());
+	public RecordWriter(AbstractInvokable invokable) {
+		this(invokable, new RoundRobinChannelSelector<T>());
 	}
 
-	public RecordWriter(AbstractTask task, ChannelSelector<T> channelSelector) {
-		this((AbstractInvokable) task, channelSelector);
-	}
-
-	public RecordWriter(AbstractInputTask<?> task) {
-		this((AbstractInvokable) task, new RoundRobinChannelSelector<T>());
-	}
-
-	public RecordWriter(AbstractInputTask<?> task, ChannelSelector<T> channelSelector) {
-		this((AbstractInvokable) task, channelSelector);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+	public RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
 		// initialize the gate
 		super(invokable);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
index fa0653b..2e75305 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
@@ -28,19 +28,21 @@ import org.apache.log4j.Level;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+import eu.stratosphere.api.java.io.TextInputFormat;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.FileLineReader;
-import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.ServerTestUtils;
+import eu.stratosphere.pact.runtime.task.DataSinkTask;
+import eu.stratosphere.pact.runtime.task.DataSourceTask;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.util.LogUtils;
 
@@ -49,6 +51,7 @@ import eu.stratosphere.util.LogUtils;
  * 
  */
 public class ExecutionGraphTest {
+	
 	@BeforeClass
 	public static void reduceLogLevel() {
 		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
@@ -76,18 +79,21 @@ public class ExecutionGraphTest {
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
+			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
+			i1.setNumberOfSubtasks(1);
+			i1.setInvokableClass(DataSourceTask.class);
+			TextInputFormat inputFormat = new TextInputFormat(new Path(inputFile.toURI()));
+			i1.setInputFormat(inputFormat);
 
 			// task vertex
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask1Input1Output.class);
+			t1.setInvokableClass(ForwardTask1Input1Output.class);
 
 			// output vertex
-			final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
-			o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
+			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
+			o1.setNumberOfSubtasks(1);
+			o1.setInvokableClass(DataSinkTask.class);
+			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
 
 			o1.setVertexToShareInstancesWith(i1);
 			i1.setVertexToShareInstancesWith(t1);
@@ -171,7 +177,7 @@ public class ExecutionGraphTest {
 			assertEquals(0, egv0.getNumberOfBackwardLinks());
 			assertEquals(1, egv0.getNumberOfForwardLinks());
 			assertEquals(0, egv0.getStageNumber());
-			assertEquals(-1, egv0.getUserDefinedNumberOfMembers());
+			assertEquals(1, egv0.getUserDefinedNumberOfMembers());
 			assertEquals("Task 1", egv0.getVertexToShareInstancesWith().getName());
 
 			// egv1 (output1)
@@ -189,7 +195,7 @@ public class ExecutionGraphTest {
 			assertEquals(1, egv1.getNumberOfBackwardLinks());
 			assertEquals(0, egv1.getNumberOfForwardLinks());
 			assertEquals(0, egv1.getStageNumber());
-			assertEquals(-1, egv1.getUserDefinedNumberOfMembers());
+			assertEquals(1, egv1.getUserDefinedNumberOfMembers());
 			assertEquals("Input 1", egv1.getVertexToShareInstancesWith().getName());
 
 			// egv2 (task1)
@@ -278,18 +284,20 @@ public class ExecutionGraphTest {
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
+			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
+			i1.setInvokableClass(DataSourceTask.class);
+			i1.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
+			i1.setNumberOfSubtasks(1);
 
 			// task vertex
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask1Input1Output.class);
+			t1.setInvokableClass(ForwardTask1Input1Output.class);
 
 			// output vertex
-			final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
-			o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
+			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
+			o1.setNumberOfSubtasks(1);
+			o1.setInvokableClass(DataSinkTask.class);
+			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
 
 			// connect vertices
 			i1.connectTo(t1, ChannelType.IN_MEMORY);
@@ -381,31 +389,32 @@ public class ExecutionGraphTest {
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile1.toURI()));
+			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
+			i1.setInvokableClass(DataSourceTask.class);
+			i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
 			i1.setNumberOfSubtasks(2);
-			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setFileInputClass(FileLineReader.class);
-			i2.setFilePath(new Path(inputFile2.toURI()));
+			
+			final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
+			i2.setInvokableClass(DataSourceTask.class);
+			i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
 			i2.setNumberOfSubtasks(2);
 
 			// task vertex
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask1Input1Output.class);
+			t1.setInvokableClass(ForwardTask1Input1Output.class);
 			t1.setNumberOfSubtasks(2);
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setTaskClass(ForwardTask1Input1Output.class);
+			t2.setInvokableClass(ForwardTask1Input1Output.class);
 			t2.setNumberOfSubtasks(2);
 			final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
-			t3.setTaskClass(ForwardTask2Inputs1Output.class);
+			t3.setInvokableClass(ForwardTask2Inputs1Output.class);
 			t3.setNumberOfSubtasks(2);
 
 			
 			// output vertex
-			final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
+			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
+			o1.setInvokableClass(DataSinkTask.class);
+			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
 			o1.setNumberOfSubtasks(2);
 			i1.setVertexToShareInstancesWith(t1);
 			t1.setVertexToShareInstancesWith(t3);
@@ -624,35 +633,35 @@ public class ExecutionGraphTest {
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile1.toURI()));
+			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
+			i1.setInvokableClass(DataSourceTask.class);
+			i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
 			i1.setNumberOfSubtasks(4);
-			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setFileInputClass(FileLineReader.class);
-			i2.setFilePath(new Path(inputFile2.toURI()));
+			final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
+			i2.setInvokableClass(DataSourceTask.class);
+			i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
 			i2.setNumberOfSubtasks(4);
 			// task vertex
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask1Input1Output.class);
+			t1.setInvokableClass(ForwardTask1Input1Output.class);
 			t1.setNumberOfSubtasks(4);
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setTaskClass(ForwardTask1Input1Output.class);
+			t2.setInvokableClass(ForwardTask1Input1Output.class);
 			t2.setNumberOfSubtasks(4);
 			final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
-			t3.setTaskClass(ForwardTask2Inputs1Output.class);
+			t3.setInvokableClass(ForwardTask2Inputs1Output.class);
 			t3.setNumberOfSubtasks(8);
 			final JobTaskVertex t4 = new JobTaskVertex("Task 4", jg);
-			t4.setTaskClass(ForwardTask1Input2Outputs.class);
+			t4.setInvokableClass(ForwardTask1Input2Outputs.class);
 			t4.setNumberOfSubtasks(8);
 			// output vertex
-			final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile1.toURI()));
+			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
+			o1.setInvokableClass(DataSinkTask.class);
+			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
 			o1.setNumberOfSubtasks(4);
-			final JobFileOutputVertex o2 = new JobFileOutputVertex("Output 2", jg);
-			o2.setFileOutputClass(FileLineWriter.class);
-			o2.setFilePath(new Path(outputFile2.toURI()));
+			final JobOutputVertex o2 = new JobOutputVertex("Output 2", jg);
+			o2.setInvokableClass(DataSinkTask.class);
+			o2.setOutputFormat(new DiscardingOuputFormat<Object>());
 			o2.setNumberOfSubtasks(4);
 			o1.setVertexToShareInstancesWith(o2);
 
@@ -690,11 +699,8 @@ public class ExecutionGraphTest {
 				ev.updateExecutionState(ExecutionState.FINISHING);
 				ev.updateExecutionState(ExecutionState.FINISHED);
 			}
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} catch (IOException e) {
+		} catch (Exception e) {
+			e.printStackTrace();
 			fail(e.getMessage());
 		} finally {
 			if (inputFile1 != null) {
@@ -728,34 +734,33 @@ public class ExecutionGraphTest {
 		final String crossTaskName = "Self Cross Task";
 		final String outputTaskName = "Self Cross Output";
 		final int degreeOfParallelism = 4;
-		File inputFile1 = null;
-		File outputFile1 = null;
+		File inputFile = null;
+		File outputFile = null;
 		JobID jobID = null;
 
 		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(0);
-			outputFile1 = new File(ServerTestUtils.getRandomFilename());
+			inputFile = ServerTestUtils.createInputFile(0);
+			outputFile = new File(ServerTestUtils.getRandomFilename());
 
 			// create job graph
 			final JobGraph jg = new JobGraph("Self Cross Test Job");
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex input = new JobFileInputVertex(inputTaskName, jg);
-			input.setFileInputClass(SelfCrossInputTask.class);
-			input.setFilePath(new Path(inputFile1.toURI()));
+			final JobInputVertex input = new JobInputVertex(inputTaskName, jg);
+			input.setInvokableClass(DataSourceTask.class);
+			input.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
 			input.setNumberOfSubtasks(degreeOfParallelism);
 
 			// cross vertex
 			final JobTaskVertex cross = new JobTaskVertex(crossTaskName, jg);
-			cross.setTaskClass(SelfCrossForwardTask.class);
+			cross.setInvokableClass(SelfCrossForwardTask.class);
 			cross.setNumberOfSubtasks(degreeOfParallelism);
 
 			// output vertex
-			final JobFileOutputVertex output = new JobFileOutputVertex(outputTaskName, jg);
-			output.setFileOutputClass(FileLineWriter.class);
-			output.setFilePath(new Path(outputFile1.toURI()));
+			final JobOutputVertex output = new JobOutputVertex(outputTaskName, jg);
+			output.setInvokableClass(DataSinkTask.class);
+			output.setOutputFormat(new DiscardingOuputFormat<Object>());
 			output.setNumberOfSubtasks(degreeOfParallelism);
 
 			// connect vertices
@@ -835,11 +840,11 @@ public class ExecutionGraphTest {
 		} catch (IOException ioe) {
 			fail(ioe.getMessage());
 		} finally {
-			if (inputFile1 != null) {
-				inputFile1.delete();
+			if (inputFile != null) {
+				inputFile.delete();
 			}
-			if (outputFile1 != null) {
-				outputFile1.delete();
+			if (outputFile != null) {
+				outputFile.delete();
 			}
 			if (jobID != null) {
 				try {
@@ -872,30 +877,32 @@ public class ExecutionGraphTest {
 			jobID = jg.getJobID();
 
 			// input vertex
-			final JobFileInputVertex input1 = new JobFileInputVertex("Input 1", jg);
-			input1.setFileInputClass(FileLineReader.class);
-			input1.setFilePath(new Path(inputFile1.toURI()));
+			final JobInputVertex input1 = new JobInputVertex("Input 1", jg);
+			input1.setInvokableClass(DataSourceTask.class);
+			input1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
 			input1.setNumberOfSubtasks(degreeOfParallelism);
+			
+			
 
 			// forward vertex 1
 			final JobTaskVertex forward1 = new JobTaskVertex("Forward 1", jg);
-			forward1.setTaskClass(ForwardTask1Input1Output.class);
+			forward1.setInvokableClass(ForwardTask1Input1Output.class);
 			forward1.setNumberOfSubtasks(degreeOfParallelism);
 
 			// forward vertex 2
 			final JobTaskVertex forward2 = new JobTaskVertex("Forward 2", jg);
-			forward2.setTaskClass(ForwardTask1Input1Output.class);
+			forward2.setInvokableClass(ForwardTask1Input1Output.class);
 			forward2.setNumberOfSubtasks(degreeOfParallelism);
 
 			// forward vertex 3
 			final JobTaskVertex forward3 = new JobTaskVertex("Forward 3", jg);
-			forward3.setTaskClass(ForwardTask1Input1Output.class);
+			forward3.setInvokableClass(ForwardTask1Input1Output.class);
 			forward3.setNumberOfSubtasks(degreeOfParallelism);
 
 			// output vertex
-			final JobFileOutputVertex output1 = new JobFileOutputVertex("Output 1", jg);
-			output1.setFileOutputClass(FileLineWriter.class);
-			output1.setFilePath(new Path(outputFile1.toURI()));
+			final JobOutputVertex output1 = new JobOutputVertex("Output 1", jg);
+			output1.setInvokableClass(DataSinkTask.class);
+			output1.setOutputFormat(new DiscardingOuputFormat<Object>());
 			output1.setNumberOfSubtasks(degreeOfParallelism);
 
 			// connect vertices

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
index 0a2f52b..24f38b5 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
@@ -14,11 +14,11 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
 
-public class ForwardTask1Input1Output extends AbstractTask {
+public class ForwardTask1Input1Output extends AbstractInvokable {
 
 	private RecordReader<StringRecord> input = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
index 5a5c325..370d0e4 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
@@ -16,9 +16,9 @@ package eu.stratosphere.nephele.executiongraph;
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 
-public class ForwardTask1Input2Outputs extends AbstractTask {
+public class ForwardTask1Input2Outputs extends AbstractInvokable {
 
 	private RecordReader<StringRecord> input = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
index c87d093..b442dc6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
@@ -14,11 +14,11 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
 
-public class ForwardTask2Inputs1Output extends AbstractTask {
+public class ForwardTask2Inputs1Output extends AbstractInvokable {
 
 	private RecordReader<StringRecord> input1 = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
index 05f181c..ac6aeb4 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
@@ -17,27 +17,20 @@ package eu.stratosphere.nephele.executiongraph;
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 
 /**
  * This class represents the cross task in the self cross unit test.
- * 
  */
-public class SelfCrossForwardTask extends AbstractTask {
-
+public class SelfCrossForwardTask extends AbstractInvokable {
 
 	@Override
 	public void registerInputOutput() {
-		
 		new RecordReader<StringRecord>(this, StringRecord.class);
 		new RecordReader<StringRecord>(this, StringRecord.class);
 		new RecordWriter<StringRecord>(this);
 	}
 
-
 	@Override
-	public void invoke() throws Exception {
-		
-		//Nothing to do here
-	}
+	public void invoke() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
new file mode 100644
index 0000000..0f24438
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
@@ -0,0 +1,132 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.fs.LineReader;
+
+public class DoubleSourceTask extends AbstractInvokable {
+
+	private RecordWriter<StringRecord> output1 = null;
+
+	private RecordWriter<StringRecord> output2 = null;
+
+	@Override
+	public void invoke() throws Exception {
+		this.output1.initializeSerializers();
+		this.output2.initializeSerializers();
+
+		final Iterator<FileInputSplit> splitIterator = getInputSplits();
+
+		while (splitIterator.hasNext()) {
+
+			final FileInputSplit split = splitIterator.next();
+
+			final long start = split.getStart();
+			final long length = split.getLength();
+
+			final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+			final FSDataInputStream fdis = fs.open(split.getPath());
+
+			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+			byte[] line = lineReader.readLine();
+
+			while (line != null) {
+
+				// Create a string object from the data read
+				StringRecord str = new StringRecord();
+				str.set(line);
+
+				// Send out string
+				output1.emit(str);
+				output2.emit(str);
+
+				line = lineReader.readLine();
+			}
+
+			// Close the stream;
+			lineReader.close();
+		}
+
+		this.output1.flush();
+		this.output2.flush();
+	}
+
+	@Override
+	public void registerInputOutput() {
+		this.output1 = new RecordWriter<StringRecord>(this);
+		this.output2 = new RecordWriter<StringRecord>(this);
+	}
+
+	private Iterator<FileInputSplit> getInputSplits() {
+
+		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+		return new Iterator<FileInputSplit>() {
+
+			private FileInputSplit nextSplit;
+			
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+				
+				if (nextSplit != null) {
+					return true;
+				}
+				
+				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
+				
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				}
+				else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public FileInputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final FileInputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
index a1ce0b2..5edfe0b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
@@ -13,18 +13,18 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.types.Record;
 
-public class DoubleTargetTask extends AbstractTask {
+public class DoubleTargetTask extends AbstractInvokable {
 
-	private RecordReader<Record> input1 = null;
+	private RecordReader<StringRecord> input1 = null;
 
-	private RecordReader<Record> input2 = null;
+	private RecordReader<StringRecord> input2 = null;
 
-	private RecordWriter<Record> output = null;
+	private RecordWriter<StringRecord> output = null;
 
 	@Override
 	public void invoke() throws Exception {
@@ -33,13 +33,13 @@ public class DoubleTargetTask extends AbstractTask {
 
 		while (this.input1.hasNext()) {
 
-			Record s = input1.next();
+			StringRecord s = input1.next();
 			this.output.emit(s);
 		}
 
 		while (this.input2.hasNext()) {
 
-			Record s = input2.next();
+			StringRecord s = input2.next();
 			this.output.emit(s);
 		}
 
@@ -49,9 +49,9 @@ public class DoubleTargetTask extends AbstractTask {
 
 	@Override
 	public void registerInputOutput() {
-		this.input1 = new RecordReader<Record>(this, Record.class);
-		this.input2 = new RecordReader<Record>(this, Record.class);
-		this.output = new RecordWriter<Record>(this);
+		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
+		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionOutputFormat.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionOutputFormat.java
index ffc4b42..e2e09c3 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionOutputFormat.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionOutputFormat.java
@@ -13,41 +13,37 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
+import eu.stratosphere.api.common.io.InitializeOnMaster;
 import eu.stratosphere.api.common.io.OutputFormat;
 import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.StringRecord;
 
 import java.io.IOException;
 
 
-public class ExceptionOutputFormat implements OutputFormat<Object> {
+public class ExceptionOutputFormat implements OutputFormat<StringRecord>, InitializeOnMaster {
+
+	private static final long serialVersionUID = 1L;
+	
 	/**
 	 * The message which is used for the test runtime exception.
 	 */
 	public static final String RUNTIME_EXCEPTION_MESSAGE = "This is a test runtime exception";
 
-
 	@Override
-	public void configure(Configuration parameters) {
-
-	}
+	public void configure(Configuration parameters) {}
 
 	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-
-	}
+	public void open(int taskNumber, int numTasks) {}
 
 	@Override
-	public void writeRecord(Object record) throws IOException {
-
-	}
+	public void writeRecord(StringRecord record) {}
 
 	@Override
-	public void close() throws IOException {
-
-	}
+	public void close() {}
 
 	@Override
-	public void initialize(Configuration configuration) {
+	public void initializeGlobal(int parallelism) throws IOException {
 		throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
index 77b4f96..9f4bcdf 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
@@ -14,16 +14,15 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
 
 /**
  * This task is used during the unit tests to generate a custom exception and check the proper response of the execution
  * engine.
- * 
  */
-public class ExceptionTask extends AbstractTask {
+public class ExceptionTask extends AbstractInvokable {
 
 	/**
 	 * The test error message included in the thrown exception
@@ -52,20 +51,14 @@ public class ExceptionTask extends AbstractTask {
 		}
 	}
 
-
 	@Override
 	public void registerInputOutput() {
-
 		new RecordReader<StringRecord>(this, StringRecord.class);
 		new RecordWriter<StringRecord>(this);
 	}
 
-
 	@Override
 	public void invoke() throws Exception {
-
-		// Throw the exception immediately
 		throw new TestException(ERROR_MESSAGE);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
index 377e304..e85b5f1 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
@@ -13,15 +13,15 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.types.Record;
 
-public class ForwardTask extends AbstractTask {
+public class ForwardTask extends AbstractInvokable {
 
-	private RecordReader<Record> input = null;
-	private RecordWriter<Record> output = null;
+	private RecordReader<StringRecord> input = null;
+	private RecordWriter<StringRecord> output = null;
 
 	@Override
 	public void invoke() throws Exception {
@@ -30,7 +30,7 @@ public class ForwardTask extends AbstractTask {
 
 		while (this.input.hasNext()) {
 
-			Record s = input.next();
+			StringRecord s = input.next();
 			this.output.emit(s);
 		}
 
@@ -39,7 +39,7 @@ public class ForwardTask extends AbstractTask {
 
 	@Override
 	public void registerInputOutput() {
-		this.input = new RecordReader<Record>(this, Record.class);
-		this.output = new RecordWriter<Record>(this);
+		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index db2d9af..2549d4f 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -13,6 +13,25 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
@@ -22,37 +41,24 @@ import eu.stratosphere.nephele.client.JobClient;
 import eu.stratosphere.nephele.client.JobExecutionException;
 import eu.stratosphere.nephele.execution.RuntimeEnvironment;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
 import eu.stratosphere.nephele.taskmanager.Task;
 import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.util.FileLineReader;
-import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.JarFileCreator;
 import eu.stratosphere.nephele.util.ServerTestUtils;
+import eu.stratosphere.nephele.util.tasks.DoubleSourceTask;
+import eu.stratosphere.nephele.util.tasks.FileLineReader;
+import eu.stratosphere.nephele.util.tasks.FileLineWriter;
+import eu.stratosphere.nephele.util.tasks.JobFileInputVertex;
+import eu.stratosphere.nephele.util.tasks.JobFileOutputVertex;
+import eu.stratosphere.pact.runtime.task.DataSinkTask;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.util.LogUtils;
 
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
  */
@@ -170,23 +176,23 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(new File(testDirectory).toURI()));
 			i1.setNumberOfSubtasks(1);
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask.class);
+			t1.setInvokableClass(ForwardTask.class);
 			t1.setNumberOfSubtasks(1);
 
 			// task vertex 2
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setTaskClass(ForwardTask.class);
+			t2.setInvokableClass(ForwardTask.class);
 			t2.setNumberOfSubtasks(1);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 			o1.setNumberOfSubtasks(1);
 
@@ -282,16 +288,16 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setTaskClass(ExceptionTask.class);
+			t1.setInvokableClass(ExceptionTask.class);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 
 			t1.setVertexToShareInstancesWith(i1);
@@ -330,10 +336,9 @@ public class JobManagerITCase {
 
 			fail("Expected exception but did not receive it");
 
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		} finally {
 
 			// Remove temporary files
@@ -376,16 +381,16 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setTaskClass(RuntimeExceptionTask.class);
+			t1.setInvokableClass(RuntimeExceptionTask.class);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 
 			t1.setVertexToShareInstancesWith(i1);
@@ -472,32 +477,28 @@ public class JobManagerITCase {
 			final JobGraph jg = new JobGraph("Job Graph for Exception Test");
 
 			// input vertex
-			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
+			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
+			i1.setInvokableClass(FileLineReader.class);
+			i1.setFilePath(new Path(inputFile.toURI()));
 			i1.setNumberOfSubtasks(1);
-			Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>)(Class<?>)DataSourceTask
-					.class;
-			i1.setInputClass(clazz);
-			TextInputFormat inputFormat = new TextInputFormat();
-			inputFormat.setFilePath(new Path(inputFile.toURI()));
-			i1.setInputFormat(inputFormat);
-			i1.setInputFormat(inputFormat);
-			i1.setOutputSerializer(RecordSerializerFactory.get());
-			TaskConfig config= new TaskConfig(i1.getConfiguration());
-			config.addOutputShipStrategy(ShipStrategyType.FORWARD);
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setTaskClass(ForwardTask.class);
+			t1.setInvokableClass(ForwardTask.class);
 
 			// output vertex
 			JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
 			o1.setNumberOfSubtasks(1);
-			o1.setOutputClass(DataSinkTask.class);
+			o1.setInvokableClass(DataSinkTask.class);
 			ExceptionOutputFormat outputFormat = new ExceptionOutputFormat();
 			o1.setOutputFormat(outputFormat);
 			TaskConfig outputConfig = new TaskConfig(o1.getConfiguration());
-			outputConfig.addInputToGroup(0);
-			outputConfig.setInputSerializer(RecordSerializerFactory.get(), 0);
+			outputConfig.setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
+//			outputConfig.addInputToGroup(0);
+//			
+//			ValueSerializer<StringRecord> serializer = new ValueSerializer<StringRecord>(StringRecord.class);
+//			RuntimeStatefulSerializerFactory<StringRecord> serializerFactory = new RuntimeStatefulSerializerFactory<StringRecord>(serializer, StringRecord.class);
+//			outputConfig.setInputSerializer(serializerFactory, 0);
 
 			t1.setVertexToShareInstancesWith(i1);
 			o1.setVertexToShareInstancesWith(i1);
@@ -591,23 +592,23 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
 			i1.setNumberOfSubtasks(1);
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setTaskClass(ForwardTask.class);
+			t1.setInvokableClass(ForwardTask.class);
 			t1.setNumberOfSubtasks(1);
 
 			// task vertex 2
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setTaskClass(ForwardTask.class);
+			t2.setInvokableClass(ForwardTask.class);
 			t2.setNumberOfSubtasks(1);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 			o1.setNumberOfSubtasks(1);
 
@@ -620,8 +621,9 @@ public class JobManagerITCase {
 				i1.connectTo(t1, ChannelType.NETWORK);
 				t1.connectTo(t2, ChannelType.IN_MEMORY);
 				t2.connectTo(o1, ChannelType.IN_MEMORY);
-			} catch (JobGraphDefinitionException e) {
+			} catch (Exception e) {
 				e.printStackTrace();
+				fail(e.getMessage());
 			}
 
 			// add jar
@@ -693,16 +695,16 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input with two Outputs", jg);
-			i1.setFileInputClass(DoubleSourceTask.class);
+			i1.setInvokableClass(DoubleSourceTask.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task with two Inputs", jg);
-			t1.setTaskClass(DoubleTargetTask.class);
+			t1.setInvokableClass(DoubleTargetTask.class);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 
 			t1.setVertexToShareInstancesWith(i1);
@@ -720,12 +722,9 @@ public class JobManagerITCase {
 			jobClient = new JobClient(jg, configuration);
 			jobClient.submitJobAndWait();
 
-		} catch (JobExecutionException e) {
+		} catch (Exception e) {
+			e.printStackTrace();
 			fail(e.getMessage());
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
 		} finally {
 
 			// Remove temporary files
@@ -772,12 +771,12 @@ public class JobManagerITCase {
 
 			// input vertex
 			final JobFileInputVertex i1 = new JobFileInputVertex(jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex(jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 
 			o1.setVertexToShareInstancesWith(i1);
@@ -791,13 +790,9 @@ public class JobManagerITCase {
 			// Create job client and launch job
 			jobClient = new JobClient(jg, configuration);
 			jobClient.submitJobAndWait();
-
-		} catch (JobExecutionException e) {
+		} catch (Exception e) {
+			e.printStackTrace();
 			fail(e.getMessage());
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
 		} finally {
 
 			// Remove temporary files
@@ -855,21 +850,21 @@ public class JobManagerITCase {
 
 			// input vertex 1
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile1.toURI()));
 
 			// input vertex 2
 			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setFileInputClass(FileLineReader.class);
+			i2.setInvokableClass(FileLineReader.class);
 			i2.setFilePath(new Path(inputFile2.toURI()));
 
 			// union task
 			final JobTaskVertex u1 = new JobTaskVertex("Union", jg);
-			u1.setTaskClass(UnionTask.class);
+			u1.setInvokableClass(UnionTask.class);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 			o1.setNumberOfSubtasks(1);
 
@@ -999,24 +994,24 @@ public class JobManagerITCase {
 
 			// input vertex 1
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setFileInputClass(FileLineReader.class);
+			i1.setInvokableClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile1.toURI()));
 			i1.setNumberOfSubtasks(numberOfSubtasks);
 
 			// input vertex 2
 			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setFileInputClass(FileLineReader.class);
+			i2.setInvokableClass(FileLineReader.class);
 			i2.setFilePath(new Path(inputFile2.toURI()));
 			i2.setNumberOfSubtasks(numberOfSubtasks);
 
 			// union task
 			final JobTaskVertex f1 = new JobTaskVertex("Forward 1", jg);
-			f1.setTaskClass(DoubleTargetTask.class);
+			f1.setInvokableClass(DoubleTargetTask.class);
 			f1.setNumberOfSubtasks(numberOfSubtasks);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
-			o1.setFileOutputClass(FileLineWriter.class);
+			o1.setInvokableClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 			o1.setNumberOfSubtasks(numberOfSubtasks);
 
@@ -1051,6 +1046,9 @@ public class JobManagerITCase {
 			} catch (JobExecutionException e) {
 				// Job execution should lead to an error due to lack of resources
 				return;
+			} catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
 			}
 			finally {
 				tmLogger.setLevel(tmLevel);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/RuntimeExceptionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/RuntimeExceptionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/RuntimeExceptionTask.java
index 9376099..ce20431 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/RuntimeExceptionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/RuntimeExceptionTask.java
@@ -13,13 +13,12 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 
 /**
  * This task throws a {@link RuntimeException} when the method <code>registerInputOutput</code> is called.
- * 
  */
-public class RuntimeExceptionTask extends AbstractTask {
+public class RuntimeExceptionTask extends AbstractInvokable {
 
 	/**
 	 * The message which is used for the test runtime exception.
@@ -29,15 +28,9 @@ public class RuntimeExceptionTask extends AbstractTask {
 
 	@Override
 	public void registerInputOutput() {
-
 		throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
 	}
 
-
 	@Override
-	public void invoke() throws Exception {
-
-		// Nothing to do here
-	}
-
+	public void invoke() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
index 209eff1..f21c60e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
@@ -14,36 +14,34 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.MutableRecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.api.UnionRecordReader;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.types.Record;
 
 /**
  * A simple implementation of a task using a {@link UnionRecordReader}.
  */
-public class UnionTask extends AbstractTask {
+public class UnionTask extends AbstractInvokable {
 
 	/**
 	 * The union record reader to be used during the tests.
 	 */
-	private UnionRecordReader<Record> unionReader;
+	private UnionRecordReader<StringRecord> unionReader;
 
-	private RecordWriter<Record> writer;
+	private RecordWriter<StringRecord> writer;
 	
 	
 	@Override
 	public void registerInputOutput() {
 
 		@SuppressWarnings("unchecked")
-		MutableRecordReader<Record>[] recordReaders = (MutableRecordReader<Record>[]) new
-				MutableRecordReader<?>[2];
-		recordReaders[0] = new MutableRecordReader<Record>(this);
-		recordReaders[1] = new MutableRecordReader<Record>(this);
-		this.unionReader = new UnionRecordReader<Record>(recordReaders, Record.class);
+		MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
+		recordReaders[0] = new MutableRecordReader<StringRecord>(this);
+		recordReaders[1] = new MutableRecordReader<StringRecord>(this);
+		this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
 		
-		this.writer = new RecordWriter<Record>(this);
+		this.writer = new RecordWriter<StringRecord>(this);
 	}
 
 	@Override
@@ -56,4 +54,4 @@ public class UnionTask extends AbstractTask {
 
 		this.writer.flush();
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
index 6a41fe9..e5cabb8 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
@@ -24,6 +24,9 @@ import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
 
 import org.junit.Test;
 
+import eu.stratosphere.api.common.io.GenericInputFormat;
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
@@ -35,63 +38,41 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.types.IntValue;
 import eu.stratosphere.util.StringUtils;
 
 /**
- *         This class checks the functionality of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} class
+ * This class checks the functionality of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} class
  */
+@SuppressWarnings("serial")
 public class DefaultSchedulerTest {
 
-	/**
-	 * Test input task.
-	 * 
-	 */
-	public static final class InputTask extends AbstractGenericInputTask {
 
-		/**
-		 * {@inheritDoc}
-		 */
+	public static final class InputTask extends AbstractInvokable {
+
 		@Override
 		public void registerInputOutput() {
 			new RecordWriter<StringRecord>(this);
 		}
 
-		/**
-		 * {@inheritDoc}
-		 */
 		@Override
-		public void invoke() throws Exception {
-			// Nothing to do here
-		}
+		public void invoke() throws Exception {}
 
 	}
 
-	/**
-	 * Test output task.
-	 * 
-	 */
-	public static final class OutputTask extends AbstractOutputTask {
+	public static final class OutputTask extends AbstractInvokable {
 
-		/**
-		 * {@inheritDoc}
-		 */
 		@Override
 		public void registerInputOutput() {
 			new RecordReader<StringRecord>(this, StringRecord.class);
 		}
 
-		/**
-		 * {@inheritDoc}
-		 */
 		@Override
-		public void invoke() throws Exception {
-			// Nothing to do here
-		}
+		public void invoke() throws Exception {}
 
 	}
 
@@ -111,29 +92,16 @@ public class DefaultSchedulerTest {
 	public static final class DummyOutputFormat implements OutputFormat<IntValue> {
 
 		@Override
-		public void configure(Configuration parameters) {
-
-		}
+		public void configure(Configuration parameters) {}
 
 		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-
-		}
-
-		@Override
-		public void writeRecord(IntValue record) throws IOException {
-
-		}
+		public void open(int taskNumber, int numTasks) {}
 
 		@Override
-		public void close() throws IOException {
-
-		}
+		public void writeRecord(IntValue record) {}
 
 		@Override
-		public void initialize(Configuration configuration) {
-
-		}
+		public void close() {}
 	}
 
 	/**
@@ -148,12 +116,12 @@ public class DefaultSchedulerTest {
 		final JobGraph jobGraph = new JobGraph("Job Graph");
 
 		final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
-		inputVertex.setInputClass(InputTask.class);
+		inputVertex.setInvokableClass(InputTask.class);
 		inputVertex.setInputFormat(new DummyInputFormat());
 		inputVertex.setNumberOfSubtasks(1);
 
 		final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
-		outputVertex.setOutputClass(OutputTask.class);
+		outputVertex.setInvokableClass(OutputTask.class);
 		outputVertex.setOutputFormat(new DummyOutputFormat());
 		outputVertex.setNumberOfSubtasks(1);
 


Mime
View raw message