flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [03/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:11 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializationTest.java
deleted file mode 100644
index 24c01d1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializationTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-
-public class SpanningRecordSerializationTest {
-
-	@Test
-	public void testIntRecordsSpanningMultipleSegments() {
-		final int SEGMENT_SIZE = 1;
-		final int NUM_VALUES = 10;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testIntRecordsWithAlignedBuffers () {
-		final int SEGMENT_SIZE = 64;
-		final int NUM_VALUES = 64;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testIntRecordsWithUnalignedBuffers () {
-		final int SEGMENT_SIZE = 31;
-		final int NUM_VALUES = 248;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	 public void testRandomRecords () {
-		final int SEGMENT_SIZE = 127;
-		final int NUM_VALUES = 10000;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer}
-	 * interact as expected.
-	 * <p>
-	 * Only a single {@link MemorySegment} will be allocated.
-	 *
-	 * @param records records to test
-	 * @param segmentSize size for the {@link MemorySegment}
-	 */
-	private void test (Util.MockRecords records, int segmentSize) throws Exception {
-		final int SERIALIZATION_OVERHEAD = 4; // length encoding
-
-		final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
-
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
-
-		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
-
-		// -------------------------------------------------------------------------------------------------------------
-
-		serializer.setNextBuffer(buffer);
-
-		int numBytes = 0;
-		int numRecords = 0;
-		for (SerializationTestType record : records) {
-
-			serializedRecords.add(record);
-
-			numRecords++;
-			numBytes += record.length() + SERIALIZATION_OVERHEAD;
-
-			// serialize record
-			if (serializer.addRecord(record).isFullBuffer()) {
-				// buffer is full => start deserializing
-				deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
-
-				while (!serializedRecords.isEmpty()) {
-					SerializationTestType expected = serializedRecords.poll();
-					SerializationTestType actual = expected.getClass().newInstance();
-
-					if (deserializer.getNextRecord(actual).isFullRecord()) {
-						Assert.assertEquals(expected, actual);
-						numRecords--;
-					} else {
-						serializedRecords.addFirst(expected);
-						break;
-					}
-				}
-
-				while (serializer.setNextBuffer(buffer).isFullBuffer()) {
-					deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
-				}
-
-
-
-			}
-		}
-
-		// deserialize left over records
-		deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize));
-
-		serializer.clear();
-
-		while (!serializedRecords.isEmpty()) {
-			SerializationTestType expected = serializedRecords.poll();
-
-			SerializationTestType actual = expected.getClass().newInstance();
-			DeserializationResult result = deserializer.getNextRecord(actual);
-
-			Assert.assertTrue(result.isFullRecord());
-			Assert.assertEquals(expected, actual);
-			numRecords--;
-		}
-
-
-		// assert that all records have been serialized and deserialized
-		Assert.assertEquals(0, numRecords);
-		Assert.assertFalse(serializer.hasData());
-		Assert.assertFalse(deserializer.hasUnfinishedData());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializerTest.java
deleted file mode 100644
index 27cf03f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/SpanningRecordSerializerTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer.SerializationResult;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-
-public class SpanningRecordSerializerTest {
-
-	@Test
-	public void testHasData() {
-		final int SEGMENT_SIZE = 16;
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
-		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
-
-		Assert.assertFalse(serializer.hasData());
-
-		try {
-			serializer.addRecord(randomIntRecord);
-			Assert.assertTrue(serializer.hasData());
-
-			serializer.setNextBuffer(buffer);
-			Assert.assertTrue(serializer.hasData());
-
-			serializer.clear();
-			Assert.assertFalse(serializer.hasData());
-
-			serializer.setNextBuffer(buffer);
-
-			serializer.addRecord(randomIntRecord);
-			Assert.assertTrue(serializer.hasData());
-
-			serializer.addRecord(randomIntRecord);
-			Assert.assertTrue(serializer.hasData());
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-
-	}
-
-	@Test
-	public void testEmptyRecords() {
-		final int SEGMENT_SIZE = 11;
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
-
-		try {
-			Assert.assertEquals(SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-
-		try {
-			SerializationTestType emptyRecord = new SerializationTestType() {
-				@Override
-				public SerializationTestType getRandom(Random rnd) {
-					throw new UnsupportedOperationException();
-				}
-
-				@Override
-				public int length() {
-					throw new UnsupportedOperationException();
-				}
-
-				@Override
-				public void write(DataOutputView out) throws IOException {
-				}
-
-				@Override
-				public void read(DataInputView in) throws IOException {
-				}
-
-				@Override
-				public int hashCode() {
-					throw new UnsupportedOperationException();
-				}
-
-				@Override
-				public boolean equals(Object obj) {
-					throw new UnsupportedOperationException();
-				}
-			};
-
-			SerializationResult result = serializer.addRecord(emptyRecord);
-			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
-
-			result = serializer.addRecord(emptyRecord);
-			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
-
-			result = serializer.addRecord(emptyRecord);
-			Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
-
-			result = serializer.setNextBuffer(buffer);
-			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-	}
-
-	@Test
-	public void testIntRecordsSpanningMultipleSegments() {
-		final int SEGMENT_SIZE = 1;
-		final int NUM_VALUES = 10;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testIntRecordsWithAlignedSegments() {
-		final int SEGMENT_SIZE = 64;
-		final int NUM_VALUES = 64;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testIntRecordsWithUnalignedSegments() {
-		final int SEGMENT_SIZE = 31;
-		final int NUM_VALUES = 248; // least common multiple => last record should align
-
-		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testRandomRecords() {
-		final int SEGMENT_SIZE = 127;
-		final int NUM_VALUES = 100000;
-
-		try {
-			test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
-	 * {@link SerializationResult} values.
-	 * <p>
-	 * Only a single {@link MemorySegment} will be allocated.
-	 *
-	 * @param records records to test
-	 * @param segmentSize size for the {@link MemorySegment}
-	 */
-	private void test(Util.MockRecords records, int segmentSize) throws Exception {
-		final int SERIALIZATION_OVERHEAD = 4; // length encoding
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
-
-		// -------------------------------------------------------------------------------------------------------------
-
-		serializer.setNextBuffer(buffer);
-
-		int numBytes = 0;
-		for (SerializationTestType record : records) {
-			SerializationResult result = serializer.addRecord(record);
-			numBytes += record.length() + SERIALIZATION_OVERHEAD;
-
-			if (numBytes < segmentSize) {
-				Assert.assertEquals(SerializationResult.FULL_RECORD, result);
-			} else if (numBytes == segmentSize) {
-				Assert.assertEquals(SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
-				serializer.setNextBuffer(buffer);
-				numBytes = 0;
-			} else {
-				Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
-
-				while (result.isFullBuffer()) {
-					numBytes -= segmentSize;
-					result = serializer.setNextBuffer(buffer);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/AsciiStringType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/AsciiStringType.java
deleted file mode 100644
index b300593..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/AsciiStringType.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class AsciiStringType implements SerializationTestType {
-
-	private static final int MAX_LEN = 1500;
-
-	public String value;
-
-	public AsciiStringType() {
-		this.value = "";
-	}
-
-	private AsciiStringType(String value) {
-		this.value = value;
-	}
-
-	@Override
-	public AsciiStringType getRandom(Random rnd) {
-		final StringBuilder bld = new StringBuilder();
-		final int len = rnd.nextInt(MAX_LEN + 1);
-
-		for (int i = 0; i < len; i++) {
-			// 1--127
-			bld.append((char) (rnd.nextInt(126) + 1));
-		}
-
-		return new AsciiStringType(bld.toString());
-	}
-
-	@Override
-	public int length() {
-		return value.getBytes().length + 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeUTF(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUTF();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof AsciiStringType) {
-			AsciiStringType other = (AsciiStringType) obj;
-			return this.value.equals(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/BooleanType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/BooleanType.java
deleted file mode 100644
index b2f6ed4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/BooleanType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class BooleanType implements SerializationTestType {
-
-	private boolean value;
-
-	public BooleanType() {
-		this.value = false;
-	}
-
-	private BooleanType(boolean value) {
-		this.value = value;
-	}
-
-	@Override
-	public BooleanType getRandom(Random rnd) {
-		return new BooleanType(rnd.nextBoolean());
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeBoolean(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readBoolean();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value ? 1 : 0;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof BooleanType) {
-			BooleanType other = (BooleanType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteArrayType.java
deleted file mode 100644
index 7aaf5a8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteArrayType.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteArrayType implements SerializationTestType {
-
-	private static final int MAX_LEN = 512 * 15;
-
-	private byte[] data;
-
-	public ByteArrayType() {
-		this.data = new byte[0];
-	}
-
-	public ByteArrayType(byte[] data) {
-		this.data = data;
-	}
-
-	@Override
-	public ByteArrayType getRandom(Random rnd) {
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final byte[] data = new byte[len];
-		rnd.nextBytes(data);
-		return new ByteArrayType(data);
-	}
-
-	@Override
-	public int length() {
-		return data.length + 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.data.length);
-		out.write(this.data);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		final int len = in.readInt();
-		this.data = new byte[len];
-		in.readFully(this.data);
-	}
-
-	@Override
-	public int hashCode() {
-		return Arrays.hashCode(this.data);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteArrayType) {
-			ByteArrayType other = (ByteArrayType) obj;
-			return Arrays.equals(this.data, other.data);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteSubArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteSubArrayType.java
deleted file mode 100644
index e965674..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteSubArrayType.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteSubArrayType implements SerializationTestType {
-
-	private static final int MAX_LEN = 512;
-
-	private final byte[] data;
-
-	private int len;
-
-	public ByteSubArrayType() {
-		this.data = new byte[MAX_LEN];
-		this.len = 0;
-	}
-
-	@Override
-	public ByteSubArrayType getRandom(Random rnd) {
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final ByteSubArrayType t = new ByteSubArrayType();
-		t.len = len;
-
-		final byte[] data = t.data;
-		for (int i = 0; i < len; i++) {
-			data[i] = (byte) rnd.nextInt(256);
-		}
-
-		return t;
-	}
-
-	@Override
-	public int length() {
-		return len + 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.len);
-		out.write(this.data, 0, this.len);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.len = in.readInt();
-		in.readFully(this.data, 0, this.len);
-	}
-
-	@Override
-	public int hashCode() {
-		final byte[] copy = new byte[this.len];
-		System.arraycopy(this.data, 0, copy, 0, this.len);
-		return Arrays.hashCode(copy);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteSubArrayType) {
-			ByteSubArrayType other = (ByteSubArrayType) obj;
-			if (this.len == other.len) {
-				for (int i = 0; i < this.len; i++) {
-					if (this.data[i] != other.data[i]) {
-						return false;
-					}
-				}
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteType.java
deleted file mode 100644
index df775d5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ByteType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteType implements SerializationTestType {
-
-	private byte value;
-
-	public ByteType() {
-		this.value = (byte) 0;
-	}
-
-	private ByteType(byte value) {
-		this.value = value;
-	}
-
-	@Override
-	public ByteType getRandom(Random rnd) {
-		return new ByteType((byte) rnd.nextInt(256));
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeByte(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readByte();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteType) {
-			ByteType other = (ByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/CharType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/CharType.java
deleted file mode 100644
index f4ef565..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/CharType.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class CharType implements SerializationTestType {
-
-	private char value;
-
-	public CharType() {
-		this.value = 0;
-	}
-
-	private CharType(char value) {
-		this.value = value;
-	}
-
-	@Override
-	public CharType getRandom(Random rnd) {
-		return new CharType((char) rnd.nextInt(10000));
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeChar(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readChar();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof CharType) {
-			CharType other = (CharType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/DoubleType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/DoubleType.java
deleted file mode 100644
index a78140c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/DoubleType.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class DoubleType implements SerializationTestType {
-
-	private double value;
-
-	public DoubleType() {
-		this.value = 0;
-	}
-
-	private DoubleType(double value) {
-		this.value = value;
-	}
-
-	@Override
-	public DoubleType getRandom(Random rnd) {
-		return new DoubleType(rnd.nextDouble());
-	}
-
-	@Override
-	public int length() {
-		return 8;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeDouble(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readDouble();
-	}
-
-	@Override
-	public int hashCode() {
-		final long l = Double.doubleToLongBits(this.value);
-		return (int) (l ^ l >>> 32);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof DoubleType) {
-			DoubleType other = (DoubleType) obj;
-			return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/FloatType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/FloatType.java
deleted file mode 100644
index 4979225..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/FloatType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class FloatType implements SerializationTestType {
-
-	private float value;
-
-	public FloatType() {
-		this.value = 0;
-	}
-
-	private FloatType(float value) {
-		this.value = value;
-	}
-
-	@Override
-	public FloatType getRandom(Random rnd) {
-		return new FloatType(rnd.nextFloat());
-	}
-
-	@Override
-	public int length() {
-		return 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeFloat(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readFloat();
-	}
-
-	@Override
-	public int hashCode() {
-		return Float.floatToIntBits(this.value);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof FloatType) {
-			FloatType other = (FloatType) obj;
-			return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/IntType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/IntType.java
deleted file mode 100644
index 9fbccef..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/IntType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class IntType implements SerializationTestType {
-
-	private int value;
-
-	public IntType() {
-		this.value = 0;
-	}
-
-	public IntType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public IntType getRandom(Random rnd) {
-		return new IntType(rnd.nextInt());
-	}
-
-	@Override
-	public int length() {
-		return 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readInt();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof IntType) {
-			IntType other = (IntType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LongType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LongType.java
deleted file mode 100644
index 73635f3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LongType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class LongType implements SerializationTestType {
-
-	private long value;
-
-	public LongType() {
-		this.value = 0;
-	}
-
-	private LongType(long value) {
-		this.value = value;
-	}
-
-	@Override
-	public LongType getRandom(Random rnd) {
-		return new LongType(rnd.nextLong());
-	}
-
-	@Override
-	public int length() {
-		return 8;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readLong();
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (this.value ^ this.value >>> 32);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof LongType) {
-			LongType other = (LongType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestType.java
deleted file mode 100644
index 17bcc13..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.util.Random;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public interface SerializationTestType extends IOReadableWritable {
-
-	public SerializationTestType getRandom(Random rnd);
-
-	public int length();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestTypeFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestTypeFactory.java
deleted file mode 100644
index 0af6947..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/SerializationTestTypeFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-public enum SerializationTestTypeFactory {
-	BOOLEAN(new BooleanType()),
-	BYTE_ARRAY(new ByteArrayType()),
-	BYTE_SUB_ARRAY(new ByteSubArrayType()),
-	BYTE(new ByteType()),
-	CHAR(new CharType()),
-	DOUBLE(new DoubleType()),
-	FLOAT(new FloatType()),
-	INT(new IntType()),
-	LONG(new LongType()),
-	SHORT(new ShortType()),
-	UNSIGNED_BYTE(new UnsignedByteType()),
-	UNSIGNED_SHORT(new UnsignedShortType()),
-	STRING(new AsciiStringType());
-
-	private final SerializationTestType factory;
-
-	SerializationTestTypeFactory(SerializationTestType type) {
-		this.factory = type;
-	}
-
-	public SerializationTestType factory() {
-		return this.factory;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ShortType.java
deleted file mode 100644
index b7ffc6d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/ShortType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ShortType implements SerializationTestType {
-
-	private short value;
-
-	public ShortType() {
-		this.value = (short) 0;
-	}
-
-	private ShortType(short value) {
-		this.value = value;
-	}
-
-	@Override
-	public ShortType getRandom(Random rnd) {
-		return new ShortType((short) rnd.nextInt(65536));
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeShort(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readShort();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ShortType) {
-			ShortType other = (ShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedByteType.java
deleted file mode 100644
index 01cc059..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedByteType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class UnsignedByteType implements SerializationTestType {
-
-	private int value;
-
-	public UnsignedByteType() {
-		this.value = 0;
-	}
-
-	private UnsignedByteType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public UnsignedByteType getRandom(Random rnd) {
-		return new UnsignedByteType(rnd.nextInt(128) + 128);
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeByte(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUnsignedByte();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof UnsignedByteType) {
-			UnsignedByteType other = (UnsignedByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedShortType.java
deleted file mode 100644
index 9165613..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/UnsignedShortType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class UnsignedShortType implements SerializationTestType {
-
-	private int value;
-
-	public UnsignedShortType() {
-		this.value = 0;
-	}
-
-	private UnsignedShortType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public UnsignedShortType getRandom(Random rnd) {
-		return new UnsignedShortType(rnd.nextInt(32768) + 32768);
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeShort(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUnsignedShort();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof UnsignedShortType) {
-			UnsignedShortType other = (UnsignedShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/Util.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/Util.java
deleted file mode 100644
index f73d793..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/Util.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.serialization.types;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
-public class Util {
-
-	private static final long SEED = 64871654635745873L;
-
-	private static Random random = new Random(SEED);
-
-	public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
-		return type.factory().getRandom(Util.random);
-	}
-
-	public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
-
-		return new MockRecords(numElements) {
-			@Override
-			protected SerializationTestType getRecord() {
-				return type.factory().getRandom(Util.random);
-			}
-		};
-	}
-
-	public static MockRecords randomRecords(final int numElements) {
-
-		return new MockRecords(numElements) {
-			@Override
-			protected SerializationTestType getRecord() {
-				// select random test type factory
-				SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
-				int i = Util.random.nextInt(types.length);
-
-				return types[i].factory().getRandom(Util.random);
-			}
-		};
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	public abstract static class MockRecords implements Iterable<SerializationTestType> {
-
-		private int numRecords;
-
-		public MockRecords(int numRecords) {
-			this.numRecords = numRecords;
-		}
-
-		@Override
-		public Iterator<SerializationTestType> iterator() {
-			return new Iterator<SerializationTestType>() {
-				@Override
-				public boolean hasNext() {
-					return numRecords > 0;
-				}
-
-				@Override
-				public SerializationTestType next() {
-					if (numRecords > 0) {
-						numRecords--;
-
-						return getRecord();
-					}
-
-					throw new NoSuchElementException();
-				}
-
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
-		}
-
-		abstract protected SerializationTestType getRecord();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
index 83c3d5d..e6fb374 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
-import static org.junit.Assert.assertEquals;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
@@ -30,11 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.junit.Test;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
 
 public class BrokerTest {
 
@@ -48,28 +46,33 @@ public class BrokerTest {
 
 	void mediate(int subtasks) throws InterruptedException, ExecutionException {
 
-		ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2);
+		final ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2);
 
-		List<Callable<StringPair>> tasks = Lists.newArrayList();
-		Broker<String> broker = new Broker<String>();
+		try {
+			List<Callable<StringPair>> tasks = Lists.newArrayList();
+			Broker<String> broker = new Broker<String>();
 
-		for (int subtask = 0; subtask < subtasks; subtask++) {
-			tasks.add(new IterationHead(broker, subtask, "value" + subtask));
-			tasks.add(new IterationTail(broker, subtask));
-		}
+			for (int subtask = 0; subtask < subtasks; subtask++) {
+				tasks.add(new IterationHead(broker, subtask, "value" + subtask));
+				tasks.add(new IterationTail(broker, subtask));
+			}
 
-		Collections.shuffle(tasks);
+			Collections.shuffle(tasks);
 
-		int numSuccessfulHandovers = 0;
-		for (Future<StringPair> future : executorService.invokeAll(tasks)) {
-			StringPair stringPair = future.get();
-			if (stringPair != null) {
-				assertEquals("value" + stringPair.getFirst(), stringPair.getSecond());
-				numSuccessfulHandovers++;
+			int numSuccessfulHandovers = 0;
+			for (Future<StringPair> future : executorService.invokeAll(tasks)) {
+				StringPair stringPair = future.get();
+				if (stringPair != null) {
+					assertEquals("value" + stringPair.getFirst(), stringPair.getSecond());
+					numSuccessfulHandovers++;
+				}
 			}
-		}
 
-		assertEquals(subtasks, numSuccessfulHandovers);
+			assertEquals(subtasks, numSuccessfulHandovers);
+		}
+		finally {
+			executorService.shutdownNow();
+		}
 	}
 
 	class IterationHead implements Callable<StringPair> {

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index 5724f36..a6c0d72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -24,8 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.junit.Test;
@@ -46,7 +45,7 @@ public class SuperstepBarrierTest {
 		}
 	}
 
-	private void sync(AbstractTaskEvent event) throws InterruptedException {
+	private void sync(TaskEvent event) throws InterruptedException {
 
 		TerminationSignaled terminationSignaled = new TerminationSignaled();
 
@@ -104,11 +103,11 @@ public class SuperstepBarrierTest {
 
 		private final SuperstepBarrier barrier;
 
-		private final AbstractTaskEvent event;
+		private final TaskEvent event;
 
 		private final Random random;
 
-		IterationSync(SuperstepBarrier barrier, AbstractTaskEvent event) {
+		IterationSync(SuperstepBarrier barrier, TaskEvent event) {
 			this.barrier = barrier;
 			this.event = event;
 			random = new Random();
@@ -119,7 +118,7 @@ public class SuperstepBarrierTest {
 			try {
 				Thread.sleep(random.nextInt(10));
 
-				barrier.eventOccurred(event);
+				barrier.onEvent(event);
 
 			} catch (Exception e) {
 				throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 0ff27b1..aac6948 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Test;
 
 public class JobGraphTest {
@@ -46,7 +45,7 @@ public class JobGraphTest {
 				AbstractJobVertex source2 = new AbstractJobVertex("source2");
 				AbstractJobVertex target = new AbstractJobVertex("target");
 				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
-				target.connectNewDataSetAsInput(source2, DistributionPattern.BIPARTITE);
+				target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL);
 				
 				jg.addVertex(source1);
 				jg.addVertex(source2);

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 1537e91..4d23b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -64,7 +64,7 @@ public class JobTaskVertexTest {
 		AbstractJobVertex target1= new AbstractJobVertex("target1");
 		AbstractJobVertex target2 = new AbstractJobVertex("target2");
 		target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
-		target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.BIPARTITE);
+		target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.ALL_TO_ALL);
 		
 		assertTrue(source.isInputVertex());
 		assertFalse(source.isOutputVertex());

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index cb657a0..9ce0d50 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -16,45 +16,52 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class, BufferWriter.class})
 public class DataSinkTaskTest extends TaskTestBase
 {
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
-	
+
 	private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
-	
+
 	private final String tempTestPath = constructTestPath(DataSinkTaskTest.class, "dst_test");
-	
+
 	@After
 	public void cleanUp() {
 		File tempTestFile = new File(this.tempTestPath);
@@ -62,20 +69,20 @@ public class DataSinkTaskTest extends TaskTestBase
 			tempTestFile.delete();
 		}
 	}
-	
+
 	@Test
 	public void testDataSinkTask() {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
-		
+
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
 
 		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
-		
+
 		try {
 			testTask.invoke();
 		} catch (Exception e) {
@@ -84,37 +91,37 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		File tempTestFile = new File(this.tempTestPath);
-		
+
 		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-		
+
 		FileReader fr = null;
 		BufferedReader br = null;
 		try {
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
-			
+
 			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
-			
+
 			while(br.ready()) {
 				String line = br.readLine();
-				
+
 				Integer key = Integer.parseInt(line.substring(0,line.indexOf("_")));
 				Integer val = Integer.parseInt(line.substring(line.indexOf("_")+1,line.length()));
-				
+
 				if(!keyValueCountMap.containsKey(key)) {
 					keyValueCountMap.put(key,new HashSet<Integer>());
 				}
 				keyValueCountMap.get(key).add(val);
 			}
-			
+
 			Assert.assertTrue("Invalid key count in out file. Expected: "+keyCnt+" Actual: "+keyValueCountMap.keySet().size(),
 				keyValueCountMap.keySet().size() == keyCnt);
-			
+
 			for(Integer key : keyValueCountMap.keySet()) {
 				Assert.assertTrue("Invalid value count for key: "+key+". Expected: "+valCnt+" Actual: "+keyValueCountMap.get(key).size(),
 					keyValueCountMap.get(key).size() == valCnt);
 			}
-			
+
 		} catch (FileNotFoundException e) {
 			Assert.fail("Out file got lost...");
 		} catch (IOException ioe) {
@@ -128,24 +135,32 @@ public class DataSinkTaskTest extends TaskTestBase
 			}
 		}
 	}
-	
+
 	@Test
 	public void testUnionDataSinkTask() {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
-		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0);
-		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0);
-		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt*2, 0, false), 0);
-		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt*3, 0, false), 0);
-		
+
+		MockIteratorBufferReader<?>[] readers = new MockIteratorBufferReader[4];
+		readers[0] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false);
+		readers[1] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false);
+		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
+		readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
+
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
 
 		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
-		
+
 		try {
+			// For the union reader to work, we need to start notifications *after* the union reader
+			// has been initialized.
+			for (MockIteratorBufferReader<?> reader : readers) {
+				reader.read();
+			}
+
 			testTask.invoke();
 		} catch (Exception e) {
 			LOG.debug("Exception while invoking the test task.", e);
@@ -153,37 +168,37 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		File tempTestFile = new File(this.tempTestPath);
-		
+
 		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-		
+
 		FileReader fr = null;
 		BufferedReader br = null;
 		try {
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
-			
+
 			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
-			
+
 			while(br.ready()) {
 				String line = br.readLine();
-				
+
 				Integer key = Integer.parseInt(line.substring(0,line.indexOf("_")));
 				Integer val = Integer.parseInt(line.substring(line.indexOf("_")+1,line.length()));
-				
+
 				if(!keyValueCountMap.containsKey(key)) {
 					keyValueCountMap.put(key,new HashSet<Integer>());
 				}
 				keyValueCountMap.get(key).add(val);
 			}
-			
+
 			Assert.assertTrue("Invalid key count in out file. Expected: "+keyCnt+" Actual: "+keyValueCountMap.keySet().size(),
 				keyValueCountMap.keySet().size() == keyCnt * 4);
-			
+
 			for(Integer key : keyValueCountMap.keySet()) {
 				Assert.assertTrue("Invalid value count for key: "+key+". Expected: "+valCnt+" Actual: "+keyValueCountMap.get(key).size(),
 					keyValueCountMap.get(key).size() == valCnt);
 			}
-			
+
 		} catch (FileNotFoundException e) {
 			Assert.fail("Out file got lost...");
 		} catch (IOException ioe) {
@@ -197,7 +212,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			}
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSortingDataSinkTask() {
@@ -205,12 +220,13 @@ public class DataSinkTaskTest extends TaskTestBase
 		int keyCnt = 100;
 		int valCnt = 20;
 		double memoryFraction = 1.0;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
-		
+
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
-		
+
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
@@ -221,33 +237,33 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
 		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
-		
+
 		try {
 			testTask.invoke();
 		} catch (Exception e) {
 			LOG.debug("Exception while invoking the test task.", e);
 			Assert.fail("Invoke method caused exception.");
 		}
-		
+
 		File tempTestFile = new File(this.tempTestPath);
-		
+
 		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-		
+
 		FileReader fr = null;
 		BufferedReader br = null;
 		try {
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
-			
+
 			Set<Integer> keys = new HashSet<Integer>();
-			
+
 			int curVal = -1;
 			while(br.ready()) {
 				String line = br.readLine();
-				
+
 				Integer key = Integer.parseInt(line.substring(0,line.indexOf("_")));
 				Integer val = Integer.parseInt(line.substring(line.indexOf("_")+1,line.length()));
-				
+
 				// check that values are in correct order
 				Assert.assertTrue("Values not in ascending order", val >= curVal);
 				// next value hit
@@ -261,10 +277,10 @@ public class DataSinkTaskTest extends TaskTestBase
 					// update current value
 					curVal = val;
 				}
-				
+
 				Assert.assertTrue("Duplicate key for value", keys.add(key));
 			}
-			
+
 		} catch (FileNotFoundException e) {
 			Assert.fail("Out file got lost...");
 		} catch (IOException ioe) {
@@ -278,13 +294,13 @@ public class DataSinkTaskTest extends TaskTestBase
 			}
 		}
 	}
-	
+
 	@Test
 	public void testFailingDataSinkTask() {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
@@ -293,7 +309,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.getTaskConfig().setStubParameters(stubParams);
 
 		super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
-		
+
 		boolean stubFailed = false;
 
 		try {
@@ -302,13 +318,13 @@ public class DataSinkTaskTest extends TaskTestBase
 			stubFailed = true;
 		}
 		Assert.assertTrue("Function exception was not forwarded.", stubFailed);
-		
+
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
 		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
-		
+
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFailingSortingDataSinkTask() {
@@ -316,25 +332,25 @@ public class DataSinkTaskTest extends TaskTestBase
 		int keyCnt = 100;
 		int valCnt = 20;;
 		double memoryFraction = 1.0;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
-		
+
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
+				new RecordComparatorFactory(new int[]{1}, ((Class<? extends Key<?>>[]) new Class[]{IntValue.class})),
 				0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
-		
+
 		super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
-		
+
 		boolean stubFailed = false;
 
 		try {
@@ -343,25 +359,25 @@ public class DataSinkTaskTest extends TaskTestBase
 			stubFailed = true;
 		}
 		Assert.assertTrue("Function exception was not forwarded.", stubFailed);
-		
+
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
 		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
-		
+
 	}
-	
+
 	@Test
 	public void testCancelDataSinkTask() {
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
-		
+
 		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
-		
+
 		super.registerFileOutputTask(testTask, MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
-		
+
 		Thread taskRunner = new Thread() {
 			@Override
 			public void run() {
@@ -374,45 +390,45 @@ public class DataSinkTaskTest extends TaskTestBase
 			}
 		};
 		taskRunner.start();
-		
+
 		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, testTask);
 		tct.start();
-		
+
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
-		
+
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
 		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testCancelSortingDataSinkTask() {
 		double memoryFraction = 1.0;
-		
+
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
-		
+
 		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
-		
+
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 
+				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
 				0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
-		
+
 		super.registerFileOutputTask(testTask, MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
-		
+
 		Thread taskRunner = new Thread() {
 			@Override
 			public void run() {
@@ -425,40 +441,40 @@ public class DataSinkTaskTest extends TaskTestBase
 			}
 		};
 		taskRunner.start();
-		
+
 		TaskCancelThread tct = new TaskCancelThread(2, taskRunner, testTask);
 		tct.start();
-		
+
 		try {
 			tct.join();
 			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
-				
+
 	}
-	
+
 	public static class MockOutputFormat extends DelimitedOutputFormat {
 		private static final long serialVersionUID = 1L;
-		
+
 		final StringBuilder bld = new StringBuilder();
-		
+
 		@Override
 		public void configure(Configuration parameters) {
 			super.configure(parameters);
 		}
-		
+
 		@Override
 		public int serializeRecord(Record rec, byte[] target) throws Exception
 		{
 			IntValue key = rec.getField(0, IntValue.class);
 			IntValue value = rec.getField(1, IntValue.class);
-		
+
 			this.bld.setLength(0);
 			this.bld.append(key.getValue());
 			this.bld.append('_');
 			this.bld.append(value.getValue());
-			
+
 			byte[] bytes = this.bld.toString().getBytes();
 			if (bytes.length <= target.length) {
 				System.arraycopy(bytes, 0, target, 0, bytes.length);
@@ -467,19 +483,19 @@ public class DataSinkTaskTest extends TaskTestBase
 			// else
 			return -bytes.length;
 		}
-		
+
 	}
-	
+
 	public static class MockFailingOutputFormat extends MockOutputFormat {
 		private static final long serialVersionUID = 1L;
 
 		int cnt = 0;
-		
+
 		@Override
 		public void configure(Configuration parameters) {
 			super.configure(parameters);
 		}
-		
+
 		@Override
 		public int serializeRecord(Record rec, byte[] target) throws Exception
 		{

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index aac7bc9..90bb944 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.junit.Assert;
 
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -40,7 +42,12 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class, BufferWriter.class})
 public class DataSourceTaskTest extends TaskTestBase {
 
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 0e029a8..e22789c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -36,13 +37,18 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class, BufferWriter.class})
 @SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 10d3534..fa72bbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.runtime.operators.resettable;
 
-import java.util.ArrayList;
-
-import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -29,16 +26,18 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.MutableObjectIteratorWrapper;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 public class SpillingResettableMutableObjectIteratorTest {
 	
 	private static final int NUM_TESTRECORDS = 50000;


Mime
View raw message