flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [19/53] [abbrv] Merge fix to omit input/output registering on JobManager Rework Invokable Task Hierarchy
Date Thu, 26 Jun 2014 09:46:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
new file mode 100644
index 0000000..aa46af8
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
@@ -0,0 +1,134 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.fs.LineReader;
+
+public class DoubleSourceTask extends AbstractInvokable {
+
+	private RecordWriter<StringRecord> output1 = null;
+
+	private RecordWriter<StringRecord> output2 = null;
+
+	@Override
+	public void invoke() throws Exception {
+		this.output1.initializeSerializers();
+		this.output2.initializeSerializers();
+
+		final Iterator<FileInputSplit> splitIterator = getInputSplits();
+
+		while (splitIterator.hasNext()) {
+
+			final FileInputSplit split = splitIterator.next();
+
+			final long start = split.getStart();
+			final long length = split.getLength();
+
+			final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+			final FSDataInputStream fdis = fs.open(split.getPath());
+
+			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+			byte[] line = lineReader.readLine();
+
+			while (line != null) {
+
+				// Create a string object from the data read
+				StringRecord str = new StringRecord();
+				str.set(line);
+
+				// Send out string
+				output1.emit(str);
+				output2.emit(str);
+
+				line = lineReader.readLine();
+			}
+
+			// Close the stream;
+			lineReader.close();
+		}
+
+		this.output1.flush();
+		this.output2.flush();
+	}
+
+	@Override
+	public void registerInputOutput() {
+		this.output1 = new RecordWriter<StringRecord>(this);
+		this.output2 = new RecordWriter<StringRecord>(this);
+	}
+
+	private Iterator<FileInputSplit> getInputSplits() {
+
+		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+		return new Iterator<FileInputSplit>() {
+
+			private FileInputSplit nextSplit;
+			
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+				
+				if (nextSplit != null) {
+					return true;
+				}
+				
+				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
+				
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				}
+				else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public FileInputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final FileInputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
new file mode 100644
index 0000000..c62911a
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
@@ -0,0 +1,133 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.fs.LineReader;
+
+/**
+ * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
+ * 
+ */
+public class FileLineReader extends AbstractInvokable {
+
+	private RecordWriter<StringRecord> output = null;
+
+	@Override
+	public void invoke() throws Exception {
+
+		output.initializeSerializers();
+
+		final Iterator<FileInputSplit> splitIterator = getInputSplits();
+
+		while (splitIterator.hasNext()) {
+
+			final FileInputSplit split = splitIterator.next();
+
+			long start = split.getStart();
+			long length = split.getLength();
+
+			final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+			final FSDataInputStream fdis = fs.open(split.getPath());
+
+			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+			byte[] line = lineReader.readLine();
+
+			while (line != null) {
+
+				// Create a string object from the data read
+				StringRecord str = new StringRecord();
+				str.set(line);
+
+				// Send out string
+				output.emit(str);
+
+				line = lineReader.readLine();
+			}
+
+			// Close the stream;
+			lineReader.close();
+		}
+
+		this.output.flush();
+	}
+
+	@Override
+	public void registerInputOutput() {
+		output = new RecordWriter<StringRecord>(this);
+	}
+	
+	private Iterator<FileInputSplit> getInputSplits() {
+
+		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+		return new Iterator<FileInputSplit>() {
+
+			private FileInputSplit nextSplit;
+			
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+				
+				if (nextSplit != null) {
+					return true;
+				}
+				
+				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
+				
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				}
+				else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public FileInputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final FileInputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
new file mode 100644
index 0000000..5f6e2b2
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
@@ -0,0 +1,72 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.runtime.io.api.RecordReader;
+
+/**
+ * A file line writer reads string records its input gate and writes them to the associated output file.
+ * 
+ */
+public class FileLineWriter extends AbstractInvokable {
+	/**
+	 * The record reader through which incoming string records are received.
+	 */
+	private RecordReader<StringRecord> input = null;
+
+
+	@Override
+	public void invoke() throws Exception {
+
+		final Configuration conf = getEnvironment().getTaskConfiguration();
+		final String outputPathString = conf.getString(JobFileOutputVertex.PATH_PROPERTY, null);
+		
+		Path outputPath = new Path(outputPathString);
+
+		FileSystem fs = FileSystem.get(outputPath.toUri());
+		if (fs.exists(outputPath)) {
+			FileStatus status = fs.getFileStatus(outputPath);
+
+			if (status.isDir()) {
+				outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
+			}
+		}
+
+		final FSDataOutputStream outputStream = fs.create(outputPath, true);
+
+		while (this.input.hasNext()) {
+
+			StringRecord record = this.input.next();
+			byte[] recordByte = (record.toString() + "\r\n").getBytes();
+			outputStream.write(recordByte, 0, recordByte.length);
+		}
+
+		outputStream.close();
+
+	}
+
+	@Override
+	public void registerInputOutput() {
+		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
new file mode 100644
index 0000000..fb0da91
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
@@ -0,0 +1,255 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.stratosphere.core.fs.BlockLocation;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+
+
+public final class JobFileInputVertex extends AbstractJobInputVertex {
+
+	/**
+	 * The fraction that the last split may be larger than the others.
+	 */
+	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
+	
+	/**
+	 * The path pointing to the input file/directory.
+	 */
+	private Path path;
+
+
+	public JobFileInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+	
+	/**
+	 * Creates a new job file input vertex with the specified name.
+	 * 
+	 * @param name
+	 *        the name of the new job file input vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public JobFileInputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public JobFileInputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}
+
+	/**
+	 * Sets the path of the file the job file input vertex's task should read from.
+	 * 
+	 * @param path
+	 *        the path of the file the job file input vertex's task should read from
+	 */
+	public void setFilePath(final Path path) {
+		this.path = path;
+	}
+
+	/**
+	 * Returns the path of the file the job file input vertex's task should read from.
+	 * 
+	 * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
+	 *         has yet been set
+	 */
+	public Path getFilePath() {
+		return this.path;
+	}
+
+	@Override
+	public void read(final DataInput in) throws IOException {
+		super.read(in);
+
+		// Read path of the input file
+		final boolean isNotNull = in.readBoolean();
+		if (isNotNull) {
+			this.path = new Path();
+			this.path.read(in);
+		}
+	}
+
+	@Override
+	public void write(final DataOutput out) throws IOException {
+		super.write(out);
+
+		// Write out the path of the input file
+		if (this.path == null) {
+			out.writeBoolean(false);
+		} else {
+			out.writeBoolean(true);
+			this.path.write(out);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+
+	@Override
+	public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
+		final Path path = this.path;
+		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
+
+		// get all the files that are involved in the splits
+		final List<FileStatus> files = new ArrayList<FileStatus>();
+		long totalLength = 0;
+
+		final FileSystem fs = path.getFileSystem();
+		final FileStatus pathFile = fs.getFileStatus(path);
+
+		if (pathFile.isDir()) {
+			// input is directory. list all contained files
+			final FileStatus[] dir = fs.listStatus(path);
+			for (int i = 0; i < dir.length; i++) {
+				if (!dir[i].isDir()) {
+					files.add(dir[i]);
+					totalLength += dir[i].getLen();
+				}
+			}
+
+		} else {
+			files.add(pathFile);
+			totalLength += pathFile.getLen();
+		}
+
+		final long minSplitSize = 1;
+		final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
+					(totalLength % minNumSplits == 0 ? 0 : 1));
+
+		// now that we have the files, generate the splits
+		int splitNum = 0;
+		for (final FileStatus file : files) {
+
+			final long len = file.getLen();
+			final long blockSize = file.getBlockSize();
+
+			final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
+			final long halfSplit = splitSize >>> 1;
+
+			final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
+
+			if (len > 0) {
+
+				// get the block locations and make sure they are in order with respect to their offset
+				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+				Arrays.sort(blocks);
+
+				long bytesUnassigned = len;
+				long position = 0;
+
+				int blockIndex = 0;
+
+				while (bytesUnassigned > maxBytesForLastSplit) {
+					// get the block containing the majority of the data
+					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
+					// create a new split
+					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
+						blocks[blockIndex]
+							.getHosts());
+					inputSplits.add(fis);
+
+					// adjust the positions
+					position += splitSize;
+					bytesUnassigned -= splitSize;
+				}
+
+				// assign the last split
+				if (bytesUnassigned > 0) {
+					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
+					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
+						bytesUnassigned,
+						blocks[blockIndex].getHosts());
+					inputSplits.add(fis);
+				}
+			} else {
+				// special case with a file of zero bytes size
+				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
+				String[] hosts;
+				if (blocks.length > 0) {
+					hosts = blocks[0].getHosts();
+				} else {
+					hosts = new String[0];
+				}
+				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
+				inputSplits.add(fis);
+			}
+		}
+
+		return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+	}
+
+	/**
+	 * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
+	 * offset.
+	 * 
+	 * @param blocks
+	 *        The different blocks of the file. Must be ordered by their offset.
+	 * @param offset
+	 *        The offset of the position in the file.
+	 * @param startIndex
+	 *        The earliest index to look at.
+	 * @return The index of the block containing the given position.
+	 */
+	private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
+			final long halfSplitSize, final int startIndex) {
+		
+		// go over all indexes after the startIndex
+		for (int i = startIndex; i < blocks.length; i++) {
+			long blockStart = blocks[i].getOffset();
+			long blockEnd = blockStart + blocks[i].getLength();
+
+			if (offset >= blockStart && offset < blockEnd) {
+				// got the block where the split starts
+				// check if the next block contains more than this one does
+				if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
+					return i + 1;
+				} else {
+					return i;
+				}
+			}
+		}
+		throw new IllegalArgumentException("The given offset is not contained in the any block.");
+	}
+
+
+	@Override
+	public Class<FileInputSplit> getInputSplitType() {
+		return FileInputSplit.class;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
new file mode 100644
index 0000000..593b520
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
@@ -0,0 +1,109 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.nephele.jobgraph.AbstractJobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+
+
+public class JobFileOutputVertex extends AbstractJobOutputVertex {
+
+	public static final String PATH_PROPERTY = "outputPath";
+	
+	/**
+	 * The path pointing to the output file/directory.
+	 */
+	private Path path;
+
+
+	public JobFileOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+	
+	/**
+	 * Creates a new job file output vertex with the specified name.
+	 * 
+	 * @param name
+	 *        the name of the new job file output vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public JobFileOutputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public JobFileOutputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}
+
+	/**
+	 * Sets the path of the file the job file input vertex's task should write to.
+	 * 
+	 * @param path
+	 *        the path of the file the job file input vertex's task should write to
+	 */
+	public void setFilePath(Path path) {
+		this.path = path;
+		getConfiguration().setString(PATH_PROPERTY, path.toString());
+	}
+
+	/**
+	 * Returns the path of the file the job file output vertex's task should write to.
+	 * 
+	 * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
+	 *         has yet been set
+	 */
+	public Path getFilePath() {
+		return this.path;
+	}
+
+	@Override
+	public void read(final DataInput in) throws IOException {
+		super.read(in);
+
+		// Read path of the input file
+		boolean isNotNull = in.readBoolean();
+		if (isNotNull) {
+			this.path = new Path();
+			this.path.read(in);
+		}
+	}
+
+	@Override
+	public void write(final DataOutput out) throws IOException {
+		super.write(out);
+
+		// Write out the path of the input file
+		if (this.path == null) {
+			out.writeBoolean(false);
+		} else {
+			out.writeBoolean(true);
+			this.path.write(out);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
index a28ba38..e59f4a6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
@@ -34,7 +34,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordPairComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
@@ -67,7 +67,7 @@ public class HashMatchIteratorITCase {
 	private static final long SEED1 = 561349061987311L;
 	private static final long SEED2 = 231434613412342L;
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 	private MemoryManager memoryManager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
index d9c8b08..755d08a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
@@ -38,7 +38,6 @@ import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
 import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
 import eu.stratosphere.pact.runtime.hash.HashMatchIteratorITCase.RecordMatch;
 import eu.stratosphere.pact.runtime.hash.HashMatchIteratorITCase.RecordMatchRemovingJoin;
 import eu.stratosphere.pact.runtime.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
@@ -75,7 +74,7 @@ public class ReOpenableHashTableITCase {
 	
 	private static final int NUM_PROBES = 3; // number of reopenings of hash join
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 	private MemoryManager memoryManager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
index fbe4f5b..c2be01a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
@@ -31,7 +31,7 @@ import eu.stratosphere.nephele.services.iomanager.ChannelWriterOutputView;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
 import eu.stratosphere.pact.runtime.test.util.TestData;
 import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
@@ -63,7 +63,7 @@ public class ChannelViewsTest
 	
 	private static final int NUM_MEMORY_SEGMENTS = 3;
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
index 1809540..c960280 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
@@ -28,7 +28,7 @@ import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
 import eu.stratosphere.pact.runtime.test.util.TestData;
 import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
@@ -54,7 +54,7 @@ public class SpillingBufferTest {
 	
 	private static final int NUM_MEMORY_SEGMENTS = 23;
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
index 26ce081..f191075 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
@@ -30,7 +30,6 @@ import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
 import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -41,10 +40,9 @@ import eu.stratosphere.pact.runtime.test.util.TestData.Value;
 import eu.stratosphere.types.Record;
 import eu.stratosphere.util.MutableObjectIterator;
 
-/**
- */
-public class AsynchonousPartialSorterITCase
-{
+
+public class AsynchonousPartialSorterITCase {
+	
 	private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
 
 	private static final long SEED = 649180756312423613L;
@@ -57,7 +55,7 @@ public class AsynchonousPartialSorterITCase
 
 	public static final int MEMORY_SIZE = 1024 * 1024 * 32;
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
index 1851480..b873f96 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
@@ -36,7 +36,7 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -66,7 +66,7 @@ public class CombiningUnilateralSortMergerITCase {
 
 	public static final int MEMORY_SIZE = 1024 * 1024 * 256;
 
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 	
 	private IOManager ioManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
index 7ba42b9..cdb8421 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
@@ -28,7 +28,7 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -61,7 +61,7 @@ public class ExternalSortITCase {
 
 	private static final int MEMORY_SIZE = 1024 * 1024 * 78;
 	
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 
@@ -238,7 +238,7 @@ public class ExternalSortITCase {
 		merger.close();
 	}
 
-	@Test
+//	@Test
 	public void testSpillingSortWithIntermediateMerge() throws Exception {
 		// amount of pairs
 		final int PAIRS = 10000000;
@@ -292,7 +292,7 @@ public class ExternalSortITCase {
 		merger.close();
 	}
 	
-	@Test
+//	@Test
 	public void testSpillingSortWithIntermediateMergeIntPair() throws Exception {
 		// amount of pairs
 		final int PAIRS = 50000000;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
index f76b802..d9877f4 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
@@ -90,7 +90,7 @@ public class MassiveStringSortingITCase {
 				MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1024 * 1024, 4, 0.8f);
+						new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
 
 				MutableObjectIterator<String> sortedData = sorter.getIterator();
 				
@@ -182,7 +182,7 @@ public class MassiveStringSortingITCase {
 				MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1024 * 1024, 4, 0.8f);
+						new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
 
 				
 				
@@ -219,10 +219,6 @@ public class MassiveStringSortingITCase {
 					
 					nextFromStratoSort = sortedData.next(nextFromStratoSort);
 					Assert.assertNotNull(nextFromStratoSort);
-						
-					if (nextFromStratoSort.f0.equals("http://some-uri.com/that/is/a/common/prefix/to/all(()HK;V3__.e*")) {
-						System.out.println("Found at position " + num);
-					}
 					
 					Assert.assertEquals(next.f0, nextFromStratoSort.f0);
 					Assert.assertArrayEquals(next.f1, nextFromStratoSort.f1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
index 0f3f558..81266d2 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
@@ -32,7 +32,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordPairComparator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
@@ -47,10 +47,9 @@ import eu.stratosphere.types.Value;
 import eu.stratosphere.util.Collector;
 import eu.stratosphere.util.MutableObjectIterator;
 
-/**
- */
-public class SortMergeMatchIteratorITCase
-{
+
+public class SortMergeMatchIteratorITCase {
+	
 	// total memory
 	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
 	private static final int PAGES_FOR_BNLJN = 2;
@@ -66,7 +65,7 @@ public class SortMergeMatchIteratorITCase
 	private static final long SEED2 = 231434613412342L;
 	
 	// dummy abstract task
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	private IOManager ioManager;
 	private MemoryManager memoryManager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
index 2999436..b744348 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
@@ -25,7 +25,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.pact.runtime.hash.BuildFirstHashMatchIterator;
 import eu.stratosphere.pact.runtime.hash.BuildSecondHashMatchIterator;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
@@ -67,7 +67,7 @@ public class HashVsSortMiniBenchmark {
 
 	
 	// dummy abstract task
-	private final AbstractTask parentTask = new DummyInvokable();
+	private final AbstractInvokable parentTask = new DummyInvokable();
 
 	// memory and io manager
 	private IOManager ioManager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
index cb0b958..7a4e09e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
@@ -13,14 +13,12 @@
 
 package eu.stratosphere.pact.runtime.test.util;
 
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 
 /**
  * An invokable that does nothing.
- *
  */
-public class DummyInvokable extends AbstractTask
-{
+public class DummyInvokable extends AbstractInvokable {
 
 	@Override
 	public void registerInputOutput() {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
index a60b479..efa69af 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
@@ -27,9 +27,7 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.FileSystem.WriteMode;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DataSinkTask;
@@ -77,7 +75,7 @@ public abstract class TaskTestBase {
 		return this.mockEnv.getTaskConfiguration();
 	}
 
-	public void registerTask(AbstractTask task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
+	public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		config.setDriver(driver);
 		config.setStubWrapper(new UserCodeClassWrapper<Function>(stubClass));
@@ -91,17 +89,16 @@ public abstract class TaskTestBase {
 		task.registerInputOutput();
 	}
 
-	public void registerTask(AbstractTask task) {
+	public void registerTask(AbstractInvokable task) {
 		task.setEnvironment(this.mockEnv);
 		task.registerInputOutput();
 	}
 
-	public void registerFileOutputTask(AbstractOutputTask outTask, Class<? extends FileOutputFormat> stubClass, String outPath)
-	{
+	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
 		registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
 	}
 	
-	public void registerFileOutputTask(AbstractOutputTask outTask, FileOutputFormat outputFormat, String outPath) {
+	public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) {
 		TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		
 		outputFormat.setOutputFilePath(new Path(outPath));
@@ -118,7 +115,7 @@ public abstract class TaskTestBase {
 		outTask.registerInputOutput();
 	}
 
-	public void registerFileInputTask(AbstractInputTask<?> inTask,
+	public void registerFileInputTask(AbstractInvokable inTask,
 			Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter)
 	{
 		DelimitedInputFormat format;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
index c7d8d41..4f9313f 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
@@ -15,6 +15,8 @@ package eu.stratosphere.runtime.io.network.bufferprovider;
 
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
+import eu.stratosphere.util.LogUtils;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -33,6 +35,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class LocalBufferPoolTest {
+	
+	static {
+		LogUtils.initializeDefaultTestConsoleLogger();
+	}
 
 	private final static int NUM_BUFFERS = 2048;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
index 96761c8..2c5fa9d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -41,7 +41,7 @@ public class TransitiveClosureITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		TransitiveClosureNaive.main(edgesPath, resultPath, "100");
+		TransitiveClosureNaive.main(edgesPath, resultPath, "5");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index 109c91a..d18160b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -30,7 +30,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
 import eu.stratosphere.pact.runtime.iterative.task.IterationSynchronizationSinkTask;
 import eu.stratosphere.pact.runtime.task.DataSinkTask;
@@ -62,9 +61,7 @@ public class JobGraphUtils {
 	{
 		JobInputVertex inputVertex = new JobInputVertex(name, graph);
 		
-		@SuppressWarnings("unchecked")
-		Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
-		inputVertex.setInputClass(clazz);
+		inputVertex.setInvokableClass(DataSourceTask.class);
 		
 		inputVertex.setNumberOfSubtasks(degreeOfParallelism);
 
@@ -91,14 +88,14 @@ public class JobGraphUtils {
 			int degreeOfParallelism)
 	{
 		JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
-		taskVertex.setTaskClass(task);
+		taskVertex.setInvokableClass(task);
 		taskVertex.setNumberOfSubtasks(degreeOfParallelism);
 		return taskVertex;
 	}
 
 	public static JobOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
 		JobOutputVertex sync = new JobOutputVertex("BulkIterationSync", jobGraph);
-		sync.setOutputClass(IterationSynchronizationSinkTask.class);
+		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setNumberOfSubtasks(1);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
@@ -108,7 +105,7 @@ public class JobGraphUtils {
 	public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
 		JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
-		outputVertex.setOutputClass(FakeOutputTask.class);
+		outputVertex.setInvokableClass(FakeOutputTask.class);
 		outputVertex.setNumberOfSubtasks(degreeOfParallelism);
 		return outputVertex;
 	}
@@ -116,7 +113,7 @@ public class JobGraphUtils {
 	public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
 		JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
-		sinkVertex.setOutputClass(DataSinkTask.class);
+		sinkVertex.setInvokableClass(DataSinkTask.class);
 		sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
 		return sinkVertex;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
index 3de547e..aa498d8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.test.recordJobs.util;
 
-import java.io.IOException;
-
 import eu.stratosphere.api.common.io.OutputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.types.Record;
@@ -23,28 +21,20 @@ import eu.stratosphere.types.Record;
  * A simple output format that discards all data by doing nothing.
  */
 public class DiscardingOutputFormat implements OutputFormat<Record> {
-	private static final long serialVersionUID = 1L;
 	
+	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void configure(Configuration parameters)
-	{}
+	public void configure(Configuration parameters) {}
 
 
 	@Override
-	public void open(int taskNumber, int numTasks) throws IOException
-	{}
-
+	public void open(int taskNumber, int numTasks) {}
 
 	@Override
-	public void writeRecord(Record record) throws IOException
-	{}
-
+	public void writeRecord(Record record) {}
 
-	@Override
-	public void close() throws IOException
-	{}
 
 	@Override
-	public void initialize(Configuration configuration){}
+	public void close() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
index a8ab311..ed6f608 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -13,38 +13,35 @@
 
 package eu.stratosphere.test.runtime;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.util.RecordAPITestBase;
 import eu.stratosphere.util.LogUtils;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 @RunWith(Parameterized.class)
 public class NetworkStackThroughput extends RecordAPITestBase {
 
@@ -153,8 +150,8 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 
 		JobGraph jobGraph = new JobGraph("Speed Test");
 
-		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
-		producer.setInputClass(SpeedTestProducer.class);
+		JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
+		producer.setInvokableClass(SpeedTestProducer.class);
 		producer.setNumberOfSubtasks(numSubtasks);
 		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
 		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
@@ -162,12 +159,12 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 		JobTaskVertex forwarder = null;
 		if (useForwarder) {
 			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
-			forwarder.setTaskClass(SpeedTestForwarder.class);
+			forwarder.setInvokableClass(SpeedTestForwarder.class);
 			forwarder.setNumberOfSubtasks(numSubtasks);
 		}
 
 		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
-		consumer.setOutputClass(SpeedTestConsumer.class);
+		consumer.setInvokableClass(SpeedTestConsumer.class);
 		consumer.setNumberOfSubtasks(numSubtasks);
 		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
@@ -188,7 +185,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 
 	// ------------------------------------------------------------------------
 
-	public static class SpeedTestProducer extends AbstractGenericInputTask {
+	public static class SpeedTestProducer extends AbstractInvokable {
 
 		private RecordWriter<SpeedTestRecord> writer;
 
@@ -227,7 +224,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 		}
 	}
 
-	public static class SpeedTestForwarder extends AbstractTask {
+	public static class SpeedTestForwarder extends AbstractInvokable {
 
 		private RecordReader<SpeedTestRecord> reader;
 
@@ -252,7 +249,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 		}
 	}
 
-	public static class SpeedTestConsumer extends AbstractOutputTask {
+	public static class SpeedTestConsumer extends AbstractInvokable {
 
 		private RecordReader<SpeedTestRecord> reader;
 


Mime
View raw message