flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject incubator-flink git commit: Added binary input and output format which uses the objects' TypeSerializer to serialize them.
Date Thu, 20 Nov 2014 14:58:39 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 8af047381 -> a77d75201


Added binary input and output format which uses the objects' TypeSerializer to serialize them.

Added InputTypeConfigurable to TypeSerializerOutputFormat to support a more seamless integration.

This closes #218.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a77d7520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a77d7520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a77d7520

Branch: refs/heads/master
Commit: a77d75201eb14c9310ade12dd1cd2d6cad1f54b6
Parents: 8af0473
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Nov 19 17:22:48 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Nov 20 15:31:32 2014 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/BinaryInputFormat.java  |   5 +-
 .../flink/api/common/io/BinaryOutputFormat.java |   4 +-
 .../api/common/io/SerializedOutputFormat.java   |   5 +-
 .../api/common/io/SequentialFormatTest.java     | 301 -------------------
 .../api/common/io/SequentialFormatTestBase.java | 287 ++++++++++++++++++
 .../api/common/io/SerializedFormatTest.java     |  94 ++++++
 .../api/java/io/TypeSerializerInputFormat.java  |  47 +++
 .../api/java/io/TypeSerializerOutputFormat.java |  55 ++++
 .../api/java/io/TypeSerializerFormatTest.java   | 115 +++++++
 9 files changed, 604 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index cbc7640..11a7f28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -35,7 +35,6 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.util.StringUtils;
@@ -44,7 +43,7 @@ import org.apache.flink.util.StringUtils;
  * Base class for all input formats that use blocks of fixed size. The input splits are aligned
to these blocks. Without
  * configuration, these block sizes equal the native block sizes of the HDFS.
  */
-public abstract class BinaryInputFormat<T extends IOReadableWritable> extends FileInputFormat<T>
{
+public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	private static final long serialVersionUID = 1L;
 
 	/**
@@ -188,7 +187,7 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable>
extends Fi
 		return this.createInputSplits(0);
 	}
 
-	protected BlockInfo createBlockInfo() {
+	public BlockInfo createBlockInfo() {
 		return new BlockInfo();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index f1aab1f..21ff372 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -24,12 +24,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 
-
-public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends FileOutputFormat<T>
{
+public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 	private static final long serialVersionUID = 1L;
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
index 9312163..71e08f2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
@@ -28,12 +28,13 @@ import org.apache.flink.core.memory.DataOutputView;
  * 
  * @see SerializedInputFormat
  */
-public class SerializedOutputFormat extends BinaryOutputFormat<IOReadableWritable>
{
+public class SerializedOutputFormat<T extends IOReadableWritable> extends
+		BinaryOutputFormat<T> {
 	
 	private static final long serialVersionUID = 1L;
 	
 	@Override
-	protected void serialize(IOReadableWritable record, DataOutputView dataOutputView) throws
IOException {
+	protected void serialize(T record, DataOutputView dataOutputView) throws IOException {
 		record.write(dataOutputView);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
deleted file mode 100644
index c73aee9..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.io;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests {@link SerializedInputFormat} and {@link SerializedOutputFormat}.
- */
-@RunWith(Parameterized.class)
-public class SequentialFormatTest {
-
-	public class InputSplitSorter implements Comparator<FileInputSplit> {
-		@Override
-		public int compare(FileInputSplit o1, FileInputSplit o2) {
-			int pathOrder = o1.getPath().getName().compareTo(o2.getPath().getName());
-			return pathOrder == 0 ? Long.signum(o1.getStart() - o2.getStart()) : pathOrder;
-		}
-	}
-
-	private int numberOfTuples;
-
-	private long blockSize;
-
-	private int degreeOfParallelism;
-
-	private BlockInfo info = new SerializedInputFormat<IOReadableWritable>().createBlockInfo();
-
-	private int[] rawDataSizes;
-
-	private File tempFile;
-
-	/**
-	 * Initializes SequentialFormatTest.
-	 */
-	public SequentialFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism)
{
-		this.numberOfTuples = numberOfTuples;
-		this.blockSize = blockSize;
-		this.degreeOfParallelism = degreeOfParallelism;
-		this.rawDataSizes = new int[degreeOfParallelism];
-	}
-
-	/**
-	 * Count how many bytes would be written if all records were directly serialized
-	 */
-	@Before
-	public void calcRawDataSize() throws IOException {
-		int recordIndex = 0;
-		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
-			ByteCounter byteCounter = new ByteCounter();
-			DataOutputStream out = new DataOutputStream(byteCounter);
-			for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++,
recordIndex++) {
-				this.getRecord(recordIndex).write(new OutputViewDataOutputStreamWrapper(out));
-			}
-			this.rawDataSizes[fileIndex] = byteCounter.getLength();
-		}
-	}
-
-	/**
-	 * Checks if the expected input splits were created
-	 */
-	@Test
-	public void checkInputSplits() throws IOException {
-		FileInputSplit[] inputSplits = this.createInputFormat().createInputSplits(0);
-		Arrays.sort(inputSplits, new InputSplitSorter());
-
-		int splitIndex = 0;
-		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
-			List<FileInputSplit> sameFileSplits = new ArrayList<FileInputSplit>();
-			Path lastPath = inputSplits[splitIndex].getPath();
-			for (; splitIndex < inputSplits.length; splitIndex++) {
-				if (!inputSplits[splitIndex].getPath().equals(lastPath)) {
-					break;
-				}
-				sameFileSplits.add(inputSplits[splitIndex]);
-			}
-
-			Assert.assertEquals(this.getExpectedBlockCount(fileIndex), sameFileSplits.size());
-
-			long lastBlockLength =
-				this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize()) + this.info.getInfoSize();
-			for (int index = 0; index < sameFileSplits.size(); index++) {
-				Assert.assertEquals(this.blockSize * index, sameFileSplits.get(index).getStart());
-				if (index < sameFileSplits.size() - 1) {
-					Assert.assertEquals(this.blockSize, sameFileSplits.get(index).getLength());
-				}
-			}
-			Assert.assertEquals(lastBlockLength, sameFileSplits.get(sameFileSplits.size() - 1).getLength());
-		}
-	}
-
-	/**
-	 * Tests if the expected sequence and amount of data can be read
-	 */
-	@Test
-	public void checkRead() throws IOException {
-		SerializedInputFormat<Record> input = this.createInputFormat();
-		FileInputSplit[] inputSplits = input.createInputSplits(0);
-		Arrays.sort(inputSplits, new InputSplitSorter());
-		int readCount = 0;
-		for (FileInputSplit inputSplit : inputSplits) {
-			input.open(inputSplit);
-			Record record = new Record();
-			while (!input.reachedEnd()) {
-				if (input.nextRecord(record) != null) {
-					this.checkEquals(this.getRecord(readCount), record);
-					readCount++;
-				}
-			}
-		}
-		Assert.assertEquals(this.numberOfTuples, readCount);
-	}
-
-	/**
-	 * Tests the statistics of the given format.
-	 */
-	@Test
-	public void checkStatistics() {
-		SerializedInputFormat<Record> input = this.createInputFormat();
-		BaseStatistics statistics = input.getStatistics(null);
-		Assert.assertEquals(this.numberOfTuples, statistics.getNumberOfRecords());
-	}
-
-	@After
-	public void cleanup() {
-		this.deleteRecursively(this.tempFile);
-	}
-
-	private void deleteRecursively(File file) {
-		if (file.isDirectory()) {
-			for (File subFile : file.listFiles()) {
-				this.deleteRecursively(subFile);
-			}
-		} else {
-			file.delete();
-		}
-	}
-
-	/**
-	 * Write out the tuples in a temporary file and return it.
-	 */
-	@Before
-	public void writeTuples() throws IOException {
-		this.tempFile = File.createTempFile("SerializedInputFormat", null);
-		this.tempFile.deleteOnExit();
-		Configuration configuration = new Configuration();
-		configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
-		if (this.degreeOfParallelism == 1) {
-			SerializedOutputFormat output =
-				FormatUtil.openOutput(SerializedOutputFormat.class, this.tempFile.toURI().toString(),
-					configuration);
-			for (int index = 0; index < this.numberOfTuples; index++) {
-				output.writeRecord(this.getRecord(index));
-			}
-			output.close();
-		} else {
-			this.tempFile.delete();
-			this.tempFile.mkdir();
-			int recordIndex = 0;
-			for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
-				SerializedOutputFormat output =
-					FormatUtil.openOutput(SerializedOutputFormat.class, this.tempFile.toURI() +
-						"/"
-						+ (fileIndex + 1), configuration);
-				for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++,
recordIndex++) {
-					output.writeRecord(this.getRecord(recordIndex));
-				}
-				output.close();
-			}
-		}
-	}
-
-	private int getNumberOfTuplesPerFile(int fileIndex) {
-		return this.numberOfTuples / this.degreeOfParallelism;
-	}
-
-	/**
-	 * Tests if the length of the file matches the expected value.
-	 */
-	@Test
-	public void checkLength() {
-		File[] files = this.tempFile.isDirectory() ? this.tempFile.listFiles() : new File[] { this.tempFile
};
-		Arrays.sort(files);
-		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
-			long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - this.info.getInfoSize());
-			long expectedLength =
-				(this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + this.info.getInfoSize()
+
-					lastBlockLength;
-			Assert.assertEquals(expectedLength, files[fileIndex].length());
-		}
-	}
-
-	protected SerializedInputFormat<Record> createInputFormat() {
-		Configuration configuration = new Configuration();
-		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
-
-		final SerializedInputFormat<Record> inputFormat = new SerializedInputFormat<Record>();
-		inputFormat.setFilePath(this.tempFile.toURI().toString());
-		
-		inputFormat.configure(configuration);
-		return inputFormat;
-	}
-
-	/**
-	 * Returns the record to write at the given position
-	 */
-	protected Record getRecord(int index) {
-		return new Record(new IntValue(index), new StringValue(String.valueOf(index)));
-	}
-
-	/**
-	 * Checks if both records are equal
-	 */
-	private void checkEquals(Record expected, Record actual) {
-		Assert.assertEquals(expected.getNumFields(), actual.getNumFields());
-		Assert.assertEquals(expected.getField(0, IntValue.class), actual.getField(0, IntValue.class));
-		Assert.assertEquals(expected.getField(1, StringValue.class), actual.getField(1, StringValue.class));
-	}
-
-	private int getExpectedBlockCount(int fileIndex) {
-		int expectedBlockCount =
-			(int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize - this.info.getInfoSize()));
-		return expectedBlockCount;
-	}
-
-	@Parameters
-	public static List<Object[]> getParameters() {
-		ArrayList<Object[]> params = new ArrayList<Object[]>();
-		for (int dop = 1; dop <= 2; dop++) {
-			// numberOfTuples, blockSize, dop
-			params.add(new Object[] { 100, BinaryOutputFormat.NATIVE_BLOCK_SIZE, dop });
-			params.add(new Object[] { 100, 1000, dop });
-			params.add(new Object[] { 100, 1 << 20, dop });
-			params.add(new Object[] { 10000, 1000, dop });
-			params.add(new Object[] { 10000, 1 << 20, dop });
-		}
-		return params;
-	}
-
-	/**
-	 * Counts the bytes that would be written.
-	 * 
-	 */
-	private static final class ByteCounter extends OutputStream {
-		int length = 0;
-
-		/**
-		 * Returns the length.
-		 * 
-		 * @return the length
-		 */
-		public int getLength() {
-			return this.length;
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			this.length++;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
new file mode 100644
index 0000000..7afd3b4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test base for {@link org.apache.flink.api.common.io.BinaryInputFormat} and {@link org.apache.flink.api.common.io.BinaryOutputFormat}.
+ */
+public abstract class SequentialFormatTestBase<T> {
+
+	public class InputSplitSorter implements Comparator<FileInputSplit> {
+		@Override
+		public int compare(FileInputSplit o1, FileInputSplit o2) {
+			int pathOrder = o1.getPath().getName().compareTo(o2.getPath().getName());
+			return pathOrder == 0 ? Long.signum(o1.getStart() - o2.getStart()) : pathOrder;
+		}
+	}
+
+	private int numberOfTuples;
+
+	protected long blockSize;
+
+	private int degreeOfParallelism;
+
+	private int[] rawDataSizes;
+
+	protected File tempFile;
+
+	/**
+	 * Initializes SequentialFormatTest.
+	 */
+	public SequentialFormatTestBase(int numberOfTuples, long blockSize, int degreeOfParallelism)
{
+		this.numberOfTuples = numberOfTuples;
+		this.blockSize = blockSize;
+		this.degreeOfParallelism = degreeOfParallelism;
+		this.rawDataSizes = new int[degreeOfParallelism];
+	}
+
+	/**
+	 * Count how many bytes would be written if all records were directly serialized
+	 */
+	@Before
+	public void calcRawDataSize() throws IOException {
+		int recordIndex = 0;
+		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
+			ByteCounter byteCounter = new ByteCounter();
+			DataOutputStream out = new DataOutputStream(byteCounter);
+			for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++,
recordIndex++) {
+				writeRecord(this.getRecord(recordIndex), new OutputViewDataOutputStreamWrapper
+						(out));
+			}
+			this.rawDataSizes[fileIndex] = byteCounter.getLength();
+		}
+	}
+
+	/**
+	 * Checks if the expected input splits were created
+	 */
+	@Test
+	public void checkInputSplits() throws IOException {
+		FileInputSplit[] inputSplits = this.createInputFormat().createInputSplits(0);
+		Arrays.sort(inputSplits, new InputSplitSorter());
+
+		int splitIndex = 0;
+		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
+			List<FileInputSplit> sameFileSplits = new ArrayList<FileInputSplit>();
+			Path lastPath = inputSplits[splitIndex].getPath();
+			for (; splitIndex < inputSplits.length; splitIndex++) {
+				if (!inputSplits[splitIndex].getPath().equals(lastPath)) {
+					break;
+				}
+				sameFileSplits.add(inputSplits[splitIndex]);
+			}
+
+			Assert.assertEquals(this.getExpectedBlockCount(fileIndex), sameFileSplits.size());
+
+			long lastBlockLength =
+				this.rawDataSizes[fileIndex] % (this.blockSize - getInfoSize()) + getInfoSize();
+			for (int index = 0; index < sameFileSplits.size(); index++) {
+				Assert.assertEquals(this.blockSize * index, sameFileSplits.get(index).getStart());
+				if (index < sameFileSplits.size() - 1) {
+					Assert.assertEquals(this.blockSize, sameFileSplits.get(index).getLength());
+				}
+			}
+			Assert.assertEquals(lastBlockLength, sameFileSplits.get(sameFileSplits.size() - 1).getLength());
+		}
+	}
+
+	/**
+	 * Tests if the expected sequence and amount of data can be read
+	 */
+	@Test
+	public void checkRead() throws IOException {
+		BinaryInputFormat<T> input = this.createInputFormat();
+		FileInputSplit[] inputSplits = input.createInputSplits(0);
+		Arrays.sort(inputSplits, new InputSplitSorter());
+		int readCount = 0;
+		for (FileInputSplit inputSplit : inputSplits) {
+			input.open(inputSplit);
+			T record = createInstance();
+			while (!input.reachedEnd()) {
+				if (input.nextRecord(record) != null) {
+					this.checkEquals(this.getRecord(readCount), record);
+					readCount++;
+				}
+			}
+		}
+		Assert.assertEquals(this.numberOfTuples, readCount);
+	}
+
+	/**
+	 * Tests the statistics of the given format.
+	 */
+	@Test
+	public void checkStatistics() {
+		BinaryInputFormat<T> input = this.createInputFormat();
+		BaseStatistics statistics = input.getStatistics(null);
+		Assert.assertEquals(this.numberOfTuples, statistics.getNumberOfRecords());
+	}
+
+	@After
+	public void cleanup() {
+		this.deleteRecursively(this.tempFile);
+	}
+
+	private void deleteRecursively(File file) {
+		if (file.isDirectory()) {
+			for (File subFile : file.listFiles()) {
+				this.deleteRecursively(subFile);
+			}
+		} else {
+			file.delete();
+		}
+	}
+
+	/**
+	 * Write out the tuples in a temporary file and return it.
+	 */
+	@Before
+	public void writeTuples() throws IOException {
+		this.tempFile = File.createTempFile("BinaryInputFormat", null);
+		this.tempFile.deleteOnExit();
+		Configuration configuration = new Configuration();
+		configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
+		if (this.degreeOfParallelism == 1) {
+			BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(),
+					configuration);
+			for (int index = 0; index < this.numberOfTuples; index++) {
+				output.writeRecord(this.getRecord(index));
+			}
+			output.close();
+		} else {
+			this.tempFile.delete();
+			this.tempFile.mkdir();
+			int recordIndex = 0;
+			for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
+				BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" +
+						(fileIndex+1), configuration);
+				for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++,
recordIndex++) {
+					output.writeRecord(this.getRecord(recordIndex));
+				}
+				output.close();
+			}
+		}
+	}
+
+	private int getNumberOfTuplesPerFile(int fileIndex) {
+		return this.numberOfTuples / this.degreeOfParallelism;
+	}
+
+	/**
+	 * Tests if the length of the file matches the expected value.
+	 */
+	@Test
+	public void checkLength() {
+		File[] files = this.tempFile.isDirectory() ? this.tempFile.listFiles() : new File[] { this.tempFile
};
+		Arrays.sort(files);
+		for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
+			long lastBlockLength = this.rawDataSizes[fileIndex] % (this.blockSize - getInfoSize());
+			long expectedLength =
+				(this.getExpectedBlockCount(fileIndex) - 1) * this.blockSize + getInfoSize() +
+					lastBlockLength;
+			Assert.assertEquals(expectedLength, files[fileIndex].length());
+		}
+	}
+
+	abstract protected BinaryInputFormat<T> createInputFormat();
+
+	abstract protected BinaryOutputFormat<T> createOutputFormat(final String path, final
+																Configuration configuration)
+			throws IOException;
+
+	abstract protected int getInfoSize();
+
+	/**
+	 * Returns the record to write at the given position
+	 */
+	abstract protected T getRecord(int index);
+
+	abstract protected T createInstance();
+
+	abstract protected void writeRecord(T record, DataOutputView outputView) throws IOException;
+
+	/**
+	 * Checks if both records are equal
+	 */
+	abstract protected void checkEquals(T expected, T actual);
+
+	private int getExpectedBlockCount(int fileIndex) {
+		int expectedBlockCount =
+			(int) Math.ceil((double) this.rawDataSizes[fileIndex] / (this.blockSize -
+					getInfoSize()));
+		return expectedBlockCount;
+	}
+
+	@Parameters
+	public static List<Object[]> getParameters() {
+		ArrayList<Object[]> params = new ArrayList<Object[]>();
+		for (int dop = 1; dop <= 2; dop++) {
+			// numberOfTuples, blockSize, dop
+			params.add(new Object[] { 100, BinaryOutputFormat.NATIVE_BLOCK_SIZE, dop });
+			params.add(new Object[] { 100, 1000, dop });
+			params.add(new Object[] { 100, 1 << 20, dop });
+			params.add(new Object[] { 10000, 1000, dop });
+			params.add(new Object[] { 10000, 1 << 20, dop });
+		}
+		return params;
+	}
+
+	/**
+	 * Counts the bytes that would be written.
+	 * 
+	 */
+	private static final class ByteCounter extends OutputStream {
+		int length = 0;
+
+		/**
+		 * Returns the length.
+		 * 
+		 * @return the length
+		 */
+		public int getLength() {
+			return this.length;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			this.length++;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
new file mode 100644
index 0000000..6f4fb58
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+
+@RunWith(Parameterized.class)
+public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
+
+	private BlockInfo info;
+
+	public SerializedFormatTest(int numberOfRecords, long blockSize, int degreeOfParallelism){
+		super(numberOfRecords, blockSize, degreeOfParallelism);
+	}
+
+	@Before
+	public void setup(){
+		info = createInputFormat().createBlockInfo();
+	}
+
+	@Override
+	protected BinaryInputFormat<Record> createInputFormat() {
+		Configuration configuration = new Configuration();
+		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
+
+		final SerializedInputFormat<Record> inputFormat = new SerializedInputFormat<Record>();
+		inputFormat.setFilePath(this.tempFile.toURI().toString());
+
+		inputFormat.configure(configuration);
+		return inputFormat;
+	}
+
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
+			configuration) throws IOException {
+		return FormatUtil.<Record, SerializedOutputFormat>openOutput
+				(SerializedOutputFormat.class, path, configuration);
+	}
+
+	@Override
+	protected int getInfoSize() {
+		return info.getInfoSize();
+	}
+
+	@Override
+	protected Record getRecord(int index) {
+		return new Record(new IntValue(index), new StringValue(String.valueOf(index)));
+	}
+
+	@Override
+	protected Record createInstance() {
+		return new Record();
+	}
+
+	@Override
+	protected void writeRecord(Record record, DataOutputView outputView) throws IOException{
+		record.write(outputView);
+	}
+
+	@Override
+	protected void checkEquals(Record expected, Record actual) {
+		Assert.assertEquals(expected.getNumFields(), actual.getNumFields());
+		Assert.assertEquals(expected.getField(0, IntValue.class), actual.getField(0, IntValue.class));
+		Assert.assertEquals(expected.getField(1, StringValue.class), actual.getField(1, StringValue.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
new file mode 100644
index 0000000..bdd54a9
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.BinaryInputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * Reads elements by deserializing them with a given type serializer.
+ * @param <T>
+ */
+public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> {
+	private TypeSerializer<T> serializer;
+
+	public TypeSerializerInputFormat(TypeSerializer<T> serializer){
+		this.serializer = serializer;
+	}
+
+	@Override
+	protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
+		if(serializer == null){
+			throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to "
+
+					"be defined.");
+		}
+
+		return serializer.deserialize(reuse, dataInput);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
new file mode 100644
index 0000000..5c316a3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.BinaryOutputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Stores elements by serializing them with their type serializer.
+ * @param <T> type parameter
+ */
+public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements
+		InputTypeConfigurable {
+	private TypeSerializer<T> serializer = null;
+
+	@Override
+	protected void serialize(T record, DataOutputView dataOutput) throws IOException {
+		if(serializer == null){
+			throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to "
+
+					"be defined.");
+		}
+		
+		serializer.serialize(record, dataOutput);
+	}
+
+	public void setSerializer(TypeSerializer<T> serializer){
+		this.serializer = serializer;
+	}
+
+	@Override
+	public void setInputType(TypeInformation<?> type) {
+		serializer = (TypeSerializer<T>) type.createSerializer();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a77d7520/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
new file mode 100644
index 0000000..ef271e7
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.BinaryInputFormat;
+import org.apache.flink.api.common.io.BinaryOutputFormat;
+import org.apache.flink.api.common.io.BlockInfo;
+import org.apache.flink.api.common.io.SequentialFormatTestBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+
+@RunWith(Parameterized.class)
+public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer,
String>> {
+
+	private TypeSerializer<Tuple2<Integer, String>> serializer;
+
+	private BlockInfo block;
+
+	public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism)
{
+		super(numberOfTuples, blockSize, degreeOfParallelism);
+
+		TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0));
+
+		serializer = tti.createSerializer();
+	}
+
+	@Before
+	public void setup(){
+		block = createInputFormat().createBlockInfo();
+	}
+
+	@Override
+	protected BinaryInputFormat<Tuple2<Integer, String>> createInputFormat() {
+		Configuration configuration = new Configuration();
+		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
+
+		final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
+				TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer);
+		inputFormat.setFilePath(this.tempFile.toURI().toString());
+
+		inputFormat.configure(configuration);
+		return inputFormat;
+	}
+
+	@Override
+	protected BinaryOutputFormat<Tuple2<Integer, String>> createOutputFormat(String
path, Configuration configuration) throws IOException {
+		TypeSerializerOutputFormat<Tuple2<Integer, String>> outputFormat = new
+				TypeSerializerOutputFormat<Tuple2<Integer, String>>();
+
+		outputFormat.setSerializer(serializer);
+		outputFormat.setOutputFilePath(new Path(path));
+		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+		configuration = configuration == null ? new Configuration() : configuration;
+
+		outputFormat.configure(configuration);
+		outputFormat.open(0, 1);
+
+		return outputFormat;
+	}
+
+	@Override
+	protected int getInfoSize() {
+		return block.getInfoSize();
+	}
+
+	@Override
+	protected Tuple2<Integer, String> getRecord(int index) {
+		return new Tuple2<Integer, String>(index, String.valueOf(index));
+	}
+
+	@Override
+	protected Tuple2<Integer, String> createInstance() {
+		return new Tuple2<Integer, String>();
+	}
+
+	@Override
+	protected void writeRecord(Tuple2<Integer, String> record, DataOutputView outputView)
throws IOException {
+		serializer.serialize(record, outputView);
+	}
+
+	@Override
+	protected void checkEquals(Tuple2<Integer, String> expected, Tuple2<Integer, String>
actual) {
+		Assert.assertEquals(expected.f0, actual.f0);
+		Assert.assertEquals(expected.f1, actual.f1);
+	}
+}


Mime
View raw message