flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [04/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:12 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
deleted file mode 100644
index 7fe2384..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.channel.embedded.EmbeddedChannel;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.BufferRecycler;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.netty.InboundEnvelopeDecoder;
-import org.apache.flink.runtime.io.network.netty.OutboundEnvelopeEncoder;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class InboundEnvelopeDecoderTest {
-
-	@Mock
-	private BufferProvider bufferProvider;
-
-	@Mock
-	private BufferProviderBroker bufferProviderBroker;
-
-	@Before
-	public void initMocks() throws IOException {
-		MockitoAnnotations.initMocks(this);
-	}
-
-	@Test
-	public void testBufferStaging() throws Exception {
-		final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(),
-				decoder);
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		// --------------------------------------------------------------------
-
-		Envelope[] envelopes = nextEnvelopes(3, true);
-
-		ByteBuf buf = encode(ch, envelopes);
-
-		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED);
-
-		Buffer buffer = allocBuffer(envelopes[2].getBuffer().size());
-
-		when(this.bufferProvider.requestBuffer(anyInt()))
-				.thenReturn(null, null, buffer, null);
-
-		// --------------------------------------------------------------------
-
-		// slices: [0] => full envelope, [1] => half envelope, [2] => remaining half + full envelope
-		ByteBuf[] slices = slice(buf,
-				OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[0].getBuffer().size(),
-				OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[1].getBuffer().size() / 2);
-
-		// 1. no buffer available, incoming slice contains all data
-		int refCount = slices[0].refCnt();
-
-		decodeAndVerify(ch, slices[0]);
-
-		Assert.assertEquals(refCount + 1, slices[0].refCnt());
-		Assert.assertFalse(ch.config().isAutoRead());
-
-		// notify of available buffer (=> bufferAvailable() callback does return a buffer
-		// of the current network buffer size; the decoder needs to adjust its size to the
-		// requested size
-		decoder.bufferAvailable(allocBuffer(envelopes[0].getBuffer().size() * 2));
-		ch.runPendingTasks();
-
-		Assert.assertEquals(refCount - 1, slices[0].refCnt());
-		Assert.assertTrue(ch.config().isAutoRead());
-
-		decodeAndVerify(ch, envelopes[0]);
-
-		// 2. no buffer available, incoming slice does NOT contain all data
-		refCount = slices[1].refCnt();
-
-		decodeAndVerify(ch, slices[1]);
-
-		Assert.assertEquals(refCount + 1, slices[1].refCnt());
-		Assert.assertFalse(ch.config().isAutoRead());
-
-		decoder.bufferAvailable(allocBuffer());
-		ch.runPendingTasks();
-
-		Assert.assertEquals(refCount - 1, slices[1].refCnt());
-		Assert.assertTrue(ch.config().isAutoRead());
-
-		decodeAndVerify(ch);
-
-		// 3. buffer available
-		refCount = slices[2].refCnt();
-
-		decodeAndVerify(ch, slices[2], envelopes[1], envelopes[2]);
-
-		Assert.assertEquals(refCount - 1, slices[2].refCnt());
-		Assert.assertTrue(ch.config().isAutoRead());
-
-		Assert.assertEquals(1, buf.refCnt());
-		buf.release();
-	}
-
-	@Test
-	public void testBufferStagingStagedBufferException() throws Exception {
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(),
-				new InboundEnvelopeDecoder(this.bufferProviderBroker));
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		// --------------------------------------------------------------------
-
-		ByteBuf buf = encode(ch, nextEnvelope(true));
-
-		when(this.bufferProvider.requestBuffer(anyInt()))
-				.thenReturn(null);
-
-		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED);
-
-		// --------------------------------------------------------------------
-
-		int refCount = buf.refCnt();
-
-		decodeAndVerify(ch, buf);
-
-		Assert.assertFalse(ch.config().isAutoRead());
-		Assert.assertEquals(refCount + 1, buf.refCnt());
-
-		try {
-			decodeAndVerify(ch, buf);
-			Assert.fail("Expected IllegalStateException not thrown");
-		} catch (IllegalStateException e) {
-			// expected exception
-		}
-
-		buf.release();
-	}
-
-	@Test
-	public void testBufferAvailabilityRegistrationBufferAvailable() throws Exception {
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(),
-				new InboundEnvelopeDecoder(this.bufferProviderBroker));
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		// --------------------------------------------------------------------
-
-		Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope()};
-
-		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE);
-
-		when(this.bufferProvider.requestBuffer(anyInt()))
-				.thenReturn(null)
-				.thenReturn(allocBuffer(envelopes[0].getBuffer().size()));
-
-		// --------------------------------------------------------------------
-
-		ByteBuf buf = encode(ch, envelopes);
-
-		decodeAndVerify(ch, buf, envelopes);
-		Assert.assertEquals(0, buf.refCnt());
-	}
-
-	@Test
-	public void testBufferAvailabilityRegistrationBufferPoolDestroyedSkipBytes() throws Exception {
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(),
-				new InboundEnvelopeDecoder(this.bufferProviderBroker));
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		when(this.bufferProvider.requestBuffer(anyInt()))
-				.thenReturn(null);
-
-		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED);
-
-		// --------------------------------------------------------------------
-
-		Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope(), nextEnvelope()};
-		Envelope[] expectedEnvelopes = new Envelope[]{envelopes[1], envelopes[2]};
-
-		ByteBuf buf = encode(ch, envelopes);
-
-		int bufferSize = envelopes[0].getBuffer().size();
-
-		// --------------------------------------------------------------------
-		// 1) skip in current buffer only
-		// --------------------------------------------------------------------
-		{
-			// skip last bytes in current buffer
-			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize);
-
-			int refCount = slices[0].refCnt();
-			decodeAndVerify(ch, slices[0]);
-			Assert.assertEquals(refCount - 1, slices[0].refCnt());
-
-			refCount = slices[1].refCnt();
-			decodeAndVerify(ch, slices[1], expectedEnvelopes);
-			Assert.assertEquals(refCount - 1, slices[1].refCnt());
-		}
-
-		{
-			// skip bytes in current buffer, leave last 16 bytes from next envelope
-			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + 16);
-
-			int refCount = slices[0].refCnt();
-			decodeAndVerify(ch, slices[0]);
-			Assert.assertEquals(refCount - 1, slices[0].refCnt());
-
-			refCount = slices[1].refCnt();
-			decodeAndVerify(ch, slices[1], expectedEnvelopes);
-			Assert.assertEquals(refCount - 1, slices[1].refCnt());
-		}
-
-		{
-			// skip bytes in current buffer, then continue with full envelope from same buffer
-			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + OutboundEnvelopeEncoder.HEADER_SIZE);
-
-			int refCount = slices[0].refCnt();
-			decodeAndVerify(ch, slices[0], expectedEnvelopes[0]);
-			Assert.assertEquals(refCount - 1, slices[0].refCnt());
-
-			refCount = slices[1].refCnt();
-			decodeAndVerify(ch, slices[1], expectedEnvelopes[1]);
-			Assert.assertEquals(refCount - 1, slices[1].refCnt());
-		}
-
-		// --------------------------------------------------------------------
-		// 2) skip in current and next buffer
-		// --------------------------------------------------------------------
-
-		{
-			// skip bytes in current buffer, then continue to skip last 32 bytes in next buffer
-			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32);
-
-			int refCount = slices[0].refCnt();
-			decodeAndVerify(ch, slices[0]);
-			Assert.assertEquals(refCount - 1, slices[0].refCnt());
-
-			refCount = slices[1].refCnt();
-			decodeAndVerify(ch, slices[1], expectedEnvelopes);
-			Assert.assertEquals(refCount - 1, slices[1].refCnt());
-		}
-
-		{
-			// skip bytes in current buffer, then continue to skip in next two buffers
-			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32, 16);
-
-			int refCount = slices[0].refCnt();
-			decodeAndVerify(ch, slices[0]);
-			Assert.assertEquals(refCount - 1, slices[0].refCnt());
-
-			refCount = slices[1].refCnt();
-			decodeAndVerify(ch, slices[1]);
-			Assert.assertEquals(refCount - 1, slices[1].refCnt());
-
-			refCount = slices[2].refCnt();
-			decodeAndVerify(ch, slices[2], expectedEnvelopes);
-			Assert.assertEquals(refCount - 1, slices[2].refCnt());
-		}
-
-		// ref count should be 1, because slices shared the ref count
-		Assert.assertEquals(1, buf.refCnt());
-	}
-
-	@Test
-	public void testEncodeDecode() throws Exception {
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(), new InboundEnvelopeDecoder(this.bufferProviderBroker));
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocation) throws Throwable {
-				// fulfill the buffer request
-				return allocBuffer((Integer) invocation.getArguments()[0]);
-			}
-		});
-
-		// --------------------------------------------------------------------
-
-		Envelope[] envelopes = new Envelope[]{
-				nextEnvelope(0),
-				nextEnvelope(2),
-				nextEnvelope(32768),
-				nextEnvelope(3782, new TestEvent1(34872527)),
-				nextEnvelope(88, new TestEvent1(8749653), new TestEvent1(365345)),
-				nextEnvelope(0, new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845)),
-				nextEnvelope(23)
-		};
-
-		ByteBuf buf = encode(ch, envelopes);
-
-		// 1. complete ByteBuf as input
-		int refCount = buf.retain().refCnt();
-
-		decodeAndVerify(ch, buf, envelopes);
-		Assert.assertEquals(refCount - 1, buf.refCnt());
-
-		// 2. random slices
-		buf.readerIndex(0);
-		ByteBuf[] slices = randomSlices(buf);
-
-		ch.writeInbound((Object[]) slices);
-
-		for (ByteBuf slice : slices) {
-			Assert.assertEquals(1, slice.refCnt());
-		}
-
-		decodeAndVerify(ch, envelopes);
-
-		buf.release();
-	}
-
-	@Test
-	public void testEncodeDecodeRandomEnvelopes() throws Exception {
-		final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
-		final EmbeddedChannel ch = new EmbeddedChannel(
-				new OutboundEnvelopeEncoder(), decoder);
-
-		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
-				.thenReturn(this.bufferProvider);
-
-		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocation) throws Throwable {
-				// fulfill the buffer request with the requested size
-				return allocBuffer((Integer) invocation.getArguments()[0]);
-			}
-		});
-
-		Random randomAnswerSource = new Random(RANDOM_SEED);
-
-		RandomBufferRequestAnswer randomBufferRequestAnswer = new RandomBufferRequestAnswer(randomAnswerSource);
-
-		RandomBufferAvailabilityRegistrationAnswer randomBufferAvailabilityRegistrationAnswer =
-				new RandomBufferAvailabilityRegistrationAnswer(randomAnswerSource, randomBufferRequestAnswer);
-
-		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(randomBufferRequestAnswer);
-
-		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenAnswer(randomBufferAvailabilityRegistrationAnswer);
-
-		// --------------------------------------------------------------------
-
-		Envelope[] envelopes = nextRandomEnvelopes(1024);
-
-		ByteBuf buf = encode(ch, envelopes);
-
-		ByteBuf[] slices = randomSlices(buf);
-
-		for (ByteBuf slice : slices) {
-			int refCount = slice.refCnt();
-			ch.writeInbound(slice);
-
-			// registered BufferAvailabilityListener => call bufferAvailable(buffer)
-			while (randomBufferAvailabilityRegistrationAnswer.isRegistered()) {
-				randomBufferAvailabilityRegistrationAnswer.unregister();
-
-				Assert.assertFalse(ch.config().isAutoRead());
-				Assert.assertEquals(refCount + 1, slice.refCnt());
-
-				// return a buffer of max size => decoder needs to limit buffer size
-				decoder.bufferAvailable(allocBuffer(MAX_BUFFER_SIZE));
-				ch.runPendingTasks();
-			}
-
-			Assert.assertEquals(refCount - 1, slice.refCnt());
-			Assert.assertTrue(ch.config().isAutoRead());
-		}
-
-		Envelope[] expected = randomBufferAvailabilityRegistrationAnswer.removeSkippedEnvelopes(envelopes);
-
-		decodeAndVerify(ch, expected);
-
-		Assert.assertEquals(1, buf.refCnt());
-
-		buf.release();
-	}
-
-	// ========================================================================
-	// helpers
-	// ========================================================================
-
-	private final static long RANDOM_SEED = 520346508276087l;
-
-	private final static Random random = new Random(RANDOM_SEED);
-
-	private final static int[] BUFFER_SIZES = new int[]{8192, 16384, 32768};
-
-	private final static int MAX_BUFFER_SIZE = BUFFER_SIZES[2];
-
-	private final static int MAX_NUM_EVENTS = 5;
-
-	private final static int MAX_SLICE_SIZE = MAX_BUFFER_SIZE / 3;
-
-	private final static int MIN_SLICE_SIZE = 1;
-
-	private final static BufferRecycler RECYCLER = mock(BufferRecycler.class);
-
-	// ------------------------------------------------------------------------
-	// envelopes
-	// ------------------------------------------------------------------------
-
-	private static Buffer allocBuffer() {
-		return allocBuffer(MAX_BUFFER_SIZE);
-	}
-
-	private static Buffer allocBuffer(int bufferSize) {
-		return spy(new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, RECYCLER));
-	}
-
-	private Envelope nextEnvelope() {
-		return nextEnvelope(false, false);
-	}
-
-	private Envelope nextEnvelope(boolean withBuffer) {
-		return nextEnvelope(withBuffer, false);
-	}
-
-	private Envelope nextEnvelope(int bufferSize, AbstractEvent... events) {
-		Envelope env = new Envelope(random.nextInt(), new JobID(), new ChannelID());
-		if (bufferSize > 0) {
-			byte[] data = new byte[bufferSize];
-			random.nextBytes(data);
-
-			env.setBuffer(spy(new Buffer(new MemorySegment(data), bufferSize, RECYCLER)));
-		}
-
-		if (events != null && events.length > 0) {
-			env.serializeEventList(Arrays.asList(events));
-		}
-
-		return env;
-	}
-
-	private Envelope nextEnvelope(boolean withBuffer, boolean withEvents) {
-		int bufferSize = 0;
-		AbstractEvent[] events = null;
-
-		if (withBuffer) {
-			bufferSize = BUFFER_SIZES[random.nextInt(BUFFER_SIZES.length)];
-		}
-
-		if (withEvents) {
-			events = new AbstractEvent[random.nextInt(MAX_NUM_EVENTS) + 1];
-
-			for (int i = 0; i < events.length; i++) {
-				events[i] = (random.nextBoolean()
-						? new TestEvent1(random.nextLong())
-						: new TestEvent2(random.nextLong()));
-			}
-		}
-
-		return nextEnvelope(bufferSize, events);
-	}
-
-	private Envelope[] nextEnvelopes(int numEnvelopes, boolean withBuffer) {
-		Envelope[] envelopes = new Envelope[numEnvelopes];
-		for (int i = 0; i < numEnvelopes; i++) {
-			envelopes[i] = nextEnvelope(withBuffer, false);
-		}
-		return envelopes;
-	}
-
-	private Envelope[] nextRandomEnvelopes(int numEnvelopes) {
-		Envelope[] envelopes = new Envelope[numEnvelopes];
-		for (int i = 0; i < numEnvelopes; i++) {
-			envelopes[i] = nextEnvelope(random.nextBoolean(), random.nextBoolean());
-		}
-		return envelopes;
-	}
-
-	// ------------------------------------------------------------------------
-	// channel encode/decode
-	// ------------------------------------------------------------------------
-
-	private static ByteBuf encode(EmbeddedChannel ch, Envelope... envelopes) {
-		for (Envelope env : envelopes) {
-			ch.writeOutbound(env);
-
-			if (env.getBuffer() != null) {
-				verify(env.getBuffer(), times(1)).recycleBuffer();
-			}
-		}
-
-		CompositeByteBuf encodedEnvelopes = new CompositeByteBuf(ByteBufAllocator.DEFAULT, false, envelopes.length);
-
-		ByteBuf buf;
-		while ((buf = (ByteBuf) ch.readOutbound()) != null) {
-			encodedEnvelopes.addComponent(buf);
-		}
-
-		return encodedEnvelopes.writerIndex(encodedEnvelopes.capacity());
-	}
-
-	private static void decodeAndVerify(EmbeddedChannel ch, ByteBuf buf, Envelope... expectedEnvelopes) {
-		ch.writeInbound(buf);
-
-		decodeAndVerify(ch, expectedEnvelopes);
-	}
-
-	private static void decodeAndVerify(EmbeddedChannel ch, Envelope... expectedEnvelopes) {
-		if (expectedEnvelopes == null) {
-			Assert.assertNull(ch.readInbound());
-		}
-		else {
-			for (Envelope expected : expectedEnvelopes) {
-				Envelope actual = (Envelope) ch.readInbound();
-
-				if (actual == null) {
-					Assert.fail("No inbound envelope available, but expected one");
-				}
-
-				assertEqualEnvelopes(expected, actual);
-			}
-		}
-	}
-
-	private static void assertEqualEnvelopes(Envelope expected, Envelope actual) {
-		Assert.assertTrue(expected.getSequenceNumber() == actual.getSequenceNumber() &&
-				expected.getJobID().equals(actual.getJobID()) &&
-				expected.getSource().equals(actual.getSource()));
-
-		if (expected.getBuffer() == null) {
-			Assert.assertNull(actual.getBuffer());
-		}
-		else {
-			Assert.assertNotNull(actual.getBuffer());
-
-			ByteBuffer expectedByteBuffer = expected.getBuffer().getMemorySegment().wrap(0, expected.getBuffer().size());
-			ByteBuffer actualByteBuffer = actual.getBuffer().getMemorySegment().wrap(0, actual.getBuffer().size());
-
-			Assert.assertEquals(0, expectedByteBuffer.compareTo(actualByteBuffer));
-		}
-
-		if (expected.getEventsSerialized() == null) {
-			Assert.assertNull(actual.getEventsSerialized());
-		}
-		else {
-			Assert.assertNotNull(actual.getEventsSerialized());
-
-			// this is needed, because the encoding of the byte buffer
-			// alters the state of the buffer
-			expected.getEventsSerialized().clear();
-
-			List<? extends AbstractEvent> expectedEvents = expected.deserializeEvents();
-			List<? extends AbstractEvent> actualEvents = actual.deserializeEvents();
-
-			Assert.assertEquals(expectedEvents.size(), actualEvents.size());
-
-			for (int i = 0; i < expectedEvents.size(); i++) {
-				AbstractEvent expectedEvent = expectedEvents.get(i);
-				AbstractEvent actualEvent = actualEvents.get(i);
-
-				Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
-				Assert.assertEquals(expectedEvent, actualEvent);
-			}
-		}
-	}
-
-	private static ByteBuf[] randomSlices(ByteBuf buf) {
-		List<Integer> sliceSizes = new LinkedList<Integer>();
-
-		if (buf.readableBytes() < MIN_SLICE_SIZE) {
-			throw new IllegalStateException("Buffer to slice is smaller than required minimum slice size");
-		}
-
-		int available = buf.readableBytes() - MIN_SLICE_SIZE;
-
-		while (available > 0) {
-			int size = Math.min(available, Math.max(MIN_SLICE_SIZE, random.nextInt(MAX_SLICE_SIZE) + 1));
-			available -= size;
-			sliceSizes.add(size);
-		}
-
-		int[] slices = new int[sliceSizes.size()];
-		for (int i = 0; i < sliceSizes.size(); i++) {
-			slices[i] = sliceSizes.get(i);
-		}
-
-		return slice(buf, slices);
-	}
-
-	/**
-	 * Returns slices with the specified sizes of the given buffer.
-	 * <p/>
-	 * When given n indexes, n+1 slices will be returned:
-	 * <ul>
-	 * <li>0 - sliceSizes[0]</li>
-	 * <li>sliceSizes[0] - sliceSizes[1]</li>
-	 * <li>...</li>
-	 * <li>sliceSizes[n-1] - buf.capacity()</li>
-	 * </ul>
-	 *
-	 * @return slices with the specified sizes of the given buffer
-	 */
-	private static ByteBuf[] slice(ByteBuf buf, int... sliceSizes) {
-		if (sliceSizes.length == 0) {
-			throw new IllegalStateException("Need to provide at least one slice size");
-		}
-
-		int numSlices = sliceSizes.length;
-		// transform slice sizes to buffer indexes
-		for (int i = 1; i < numSlices; i++) {
-			sliceSizes[i] += sliceSizes[i - 1];
-		}
-
-		for (int i = 0; i < sliceSizes.length - 1; i++) {
-			if (sliceSizes[i] >= sliceSizes[i + 1] || sliceSizes[i] <= 0 || sliceSizes[i] >= buf.capacity()) {
-				throw new IllegalStateException(
-						String.format("Slice size %s are off for %s", Arrays.toString(sliceSizes), buf));
-			}
-		}
-
-		ByteBuf[] slices = new ByteBuf[numSlices + 1];
-
-		// slice at slice indexes
-		slices[0] = buf.slice(0, sliceSizes[0]).retain();
-		for (int i = 1; i < numSlices; i++) {
-			slices[i] = buf.slice(sliceSizes[i - 1], sliceSizes[i] - sliceSizes[i - 1]).retain();
-		}
-		slices[numSlices] = buf.slice(sliceSizes[numSlices - 1], buf.capacity() - sliceSizes[numSlices - 1]).retain();
-
-		return slices;
-	}
-
-	// ------------------------------------------------------------------------
-	// mocking
-	// ------------------------------------------------------------------------
-
-	private static JobID anyJobId() {
-		return Matchers.anyObject();
-	}
-
-	private static ChannelID anyChannelId() {
-		return Matchers.anyObject();
-	}
-
-	// these following two Answer classes are quite ugly, but they allow to implement a randomized
-	// test of encoding and decoding envelopes
-	private static class RandomBufferRequestAnswer implements Answer<Buffer> {
-
-		private final Random random;
-
-		private boolean forced;
-
-		private RandomBufferRequestAnswer(Random random) {
-			this.random = random;
-		}
-
-		@Override
-		public Buffer answer(InvocationOnMock invocation) throws Throwable {
-			if (this.forced) {
-				Buffer toReturn = allocBuffer((Integer) invocation.getArguments()[0]);
-				this.forced = false;
-
-				return toReturn;
-			}
-
-			return this.random.nextBoolean() ? allocBuffer((Integer) invocation.getArguments()[0]) : null;
-		}
-
-		public void forceBufferAvailable() {
-			this.forced = true;
-		}
-	}
-
-	private static class RandomBufferAvailabilityRegistrationAnswer implements Answer<BufferAvailabilityRegistration> {
-
-		private final Random random;
-
-		private final RandomBufferRequestAnswer bufferRequestAnswer;
-
-		private boolean isRegistered = false;
-
-		private int numSkipped;
-
-		private RandomBufferAvailabilityRegistrationAnswer(Random random, RandomBufferRequestAnswer bufferRequestAnswer) {
-			this.random = random;
-			this.bufferRequestAnswer = bufferRequestAnswer;
-		}
-
-		@Override
-		public BufferAvailabilityRegistration answer(InvocationOnMock invocation) throws Throwable {
-			if (this.random.nextBoolean()) {
-				this.isRegistered = true;
-				return BufferAvailabilityRegistration.SUCCEEDED_REGISTERED;
-			}
-			else if (this.random.nextBoolean()) {
-				this.bufferRequestAnswer.forceBufferAvailable();
-				return BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE;
-			}
-			else {
-				this.numSkipped++;
-				return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
-			}
-		}
-
-		public Envelope[] removeSkippedEnvelopes(Envelope[] envelopes) {
-			this.random.setSeed(RANDOM_SEED);
-			Envelope[] envelopesWithoutSkipped = new Envelope[envelopes.length - this.numSkipped];
-			int numEnvelopes = 0;
-
-			for (Envelope env : envelopes) {
-				if (env.getBuffer() != null) {
-					// skip envelope if returned FAILED_BUFFER_POOL_DESTROYED
-					if (!this.random.nextBoolean() && !this.random.nextBoolean() && !this.random.nextBoolean()) {
-						continue;
-					}
-				}
-
-				envelopesWithoutSkipped[numEnvelopes++] = env;
-			}
-
-			return envelopesWithoutSkipped;
-		}
-
-		public boolean isRegistered() {
-			return this.isRegistered;
-		}
-
-		public void unregister() {
-			this.isRegistered = false;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static final class TestEvent1 extends AbstractEvent {
-
-		private long id;
-
-		public TestEvent1() {
-		}
-
-		public TestEvent1(long id) {
-			this.id = id;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeLong(id);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			id = in.readLong();
-		}
-
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj.getClass() == TestEvent1.class && ((TestEvent1) obj).id == this.id;
-		}
-
-		@Override
-		public int hashCode() {
-			return ((int) id) ^ ((int) (id >>> 32));
-		}
-
-		@Override
-		public String toString() {
-			return "TestEvent1 (" + id + ")";
-		}
-	}
-
-	public static final class TestEvent2 extends AbstractEvent {
-
-		private long id;
-
-		public TestEvent2() {
-		}
-
-		public TestEvent2(long id) {
-			this.id = id;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeLong(id);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			id = in.readLong();
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj.getClass() == TestEvent2.class && ((TestEvent2) obj).id == this.id;
-		}
-
-		@Override
-		public int hashCode() {
-			return ((int) id) ^ ((int) (id >>> 32));
-		}
-
-		@Override
-		public String toString() {
-			return "TestEvent2 (" + id + ")";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
deleted file mode 100644
index c3e728b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.junit.Assert;
-
-import org.apache.flink.runtime.io.network.ChannelManager;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-
-public class NettyConnectionManagerTest {
-
-	private final static long RANDOM_SEED = 520346508276087l;
-
-	private final static Random random = new Random(RANDOM_SEED);
-
-	private final static int BIND_PORT = 20000;
-
-	private final static int BUFFER_SIZE = 32 * 1024;
-
-	public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
-		Integer[][] configs = new Integer[][]{
-				{64, 4096, 1, 1, 1},
-				{128, 2048, 1, 1, 1},
-				{256, 1024, 1, 1, 1},
-				{512, 512, 1, 1, 1},
-				{64, 4096, 4, 1, 1},
-				{128, 2048, 4, 1, 1},
-				{256, 1024, 4, 1, 1},
-				{512, 512, 4, 1, 1},
-				{64, 4096, 4, 2, 2},
-				{128, 2048, 4, 2, 2},
-				{256, 1024, 4, 2, 2},
-				{512, 512, 4, 2, 2}
-		};
-
-		for (Integer[] params : configs) {
-			System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, "
-					+ "%d num channels, %d num in threads, %d num out threads.",
-					"testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4]));
-
-			long start = System.currentTimeMillis();
-			doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]);
-			long end = System.currentTimeMillis();
-
-			System.out.println(String.format("Runtime: %d ms.", (end - start)));
-		}
-	}
-
-	private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels(
-			int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads)
-			throws Exception {
-
-		final InetAddress localhost = InetAddress.getLocalHost();
-		final CountDownLatch latch = new CountDownLatch(numSubtasks);
-
-		// --------------------------------------------------------------------
-		// setup
-		// --------------------------------------------------------------------
-		ChannelManager channelManager = mock(ChannelManager.class);
-		doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask))
-				.when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
-
-		final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE,
-				numInThreads, numOutThreads, -1, -1);
-		senderConnManager.start(channelManager);
-
-		NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE,
-				numInThreads, numOutThreads, -1, -1);
-		receiverConnManager.start(channelManager);
-
-		// --------------------------------------------------------------------
-		// start sender threads
-		// --------------------------------------------------------------------
-		RemoteReceiver[] receivers = new RemoteReceiver[numChannels];
-
-		for (int i = 0; i < numChannels; i++) {
-			receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
-		}
-
-		for (int i = 0; i < numSubtasks; i++) {
-			final RemoteReceiver receiver = receivers[random.nextInt(numChannels)];
-
-			final AtomicInteger seqNum = new AtomicInteger(0);
-			final JobID jobId = new JobID();
-			final ChannelID channelId = new ChannelID();
-
-			new Thread(new Runnable() {
-				@Override
-				public void run() {
-					// enqueue envelopes with ascending seq numbers
-					while (seqNum.get() < numToSendPerSubtask) {
-						try {
-							int sequenceNumber = seqNum.getAndIncrement();
-
-							Envelope env = new Envelope(sequenceNumber, jobId, channelId);
-							senderConnManager.enqueue(env, receiver, sequenceNumber == 0);
-						} catch (IOException e) {
-							throw new RuntimeException("Unexpected exception while enqueuing envelope.");
-						}
-					}
-				}
-			}).start();
-		}
-
-		latch.await();
-
-		senderConnManager.shutdown();
-		receiverConnManager.shutdown();
-	}
-
-	/**
-	 * Verifies correct ordering of received envelopes (per envelope source channel ID).
-	 */
-	private class VerifyEnvelopes implements Answer<Void> {
-
-		private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
-
-		private final CountDownLatch latch;
-
-		private final int numExpectedEnvelopesPerSubtask;
-
-		private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) {
-			this.latch = latch;
-			this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask;
-		}
-
-		@Override
-		public Void answer(InvocationOnMock invocation) throws Throwable {
-			Envelope env = (Envelope) invocation.getArguments()[0];
-
-			ChannelID channelId = env.getSource();
-			int seqNum = env.getSequenceNumber();
-
-			if (seqNum == 0) {
-				Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum);
-
-				String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId);
-				Assert.assertNull(msg, previousSeqNum);
-			}
-			else {
-				boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum);
-
-				String msg = String.format("Received seq num %d from %s, but previous was not %d.",
-						seqNum, channelId, seqNum - 1);
-				Assert.assertTrue(msg, isExpectedPreviousSeqNum);
-			}
-
-			// count down the latch if all envelopes received for this source
-			if (seqNum == numExpectedEnvelopesPerSubtask - 1) {
-				this.latch.countDown();
-			}
-
-			return null;
-		}
-	}
-
-	private void runAllTests() throws Exception {
-		testEnqueueRaceAndDeadlockFreeMultipleChannels();
-
-		System.out.println("Done.");
-	}
-
-	public static void main(String[] args) throws Exception {
-		new NettyConnectionManagerTest().runAllTests();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
new file mode 100644
index 0000000..991af13
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class NettyMessageSerializationTest {
+
+	private final EmbeddedChannel channel = new EmbeddedChannel(
+			new NettyMessage.NettyMessageEncoder(), // outbound messages
+			NettyMessage.NettyMessageEncoder.createFrameLengthDecoder(), // inbound messages
+			new NettyMessage.NettyMessageDecoder()); // inbound messages
+
+	private final Random random = new Random();
+
+	@Test
+	public void testEncodeDecode() {
+		{
+			Buffer buffer = spy(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+			ByteBuffer nioBuffer = buffer.getNioBuffer();
+
+			for (int i = 0; i < 1024; i += 4) {
+				nioBuffer.putInt(i);
+			}
+
+			NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse(buffer, random.nextInt(), new InputChannelID());
+			NettyMessage.BufferResponse actual = encodeAndDecode(expected);
+
+			// Verify recycle has been called on buffer instance
+			verify(buffer, times(1)).recycle();
+
+			final ByteBuf retainedSlice = actual.getNettyBuffer();
+
+			// Ensure not recycled and same size as original buffer
+			assertEquals(1, retainedSlice.refCnt());
+			assertEquals(1024, retainedSlice.readableBytes());
+
+			nioBuffer = retainedSlice.nioBuffer();
+			for (int i = 0; i < 1024; i += 4) {
+				assertEquals(i, nioBuffer.getInt());
+			}
+
+			// Release the retained slice
+			actual.releaseBuffer();
+			assertEquals(0, retainedSlice.refCnt());
+
+			assertEquals(expected.sequenceNumber, actual.sequenceNumber);
+			assertEquals(expected.receiverId, actual.receiverId);
+		}
+
+		{
+			{
+				IllegalStateException expectedError = new IllegalStateException();
+				InputChannelID receiverId = new InputChannelID();
+
+				NettyMessage.ErrorResponse expected = new NettyMessage.ErrorResponse(expectedError, receiverId);
+				NettyMessage.ErrorResponse actual = encodeAndDecode(expected);
+
+				assertEquals(expected.error.getClass(), actual.error.getClass());
+				assertEquals(expected.error.getMessage(), actual.error.getMessage());
+				assertEquals(receiverId, actual.receiverId);
+			}
+
+			{
+				IllegalStateException expectedError = new IllegalStateException("Illegal illegal illegal");
+				InputChannelID receiverId = new InputChannelID();
+
+				NettyMessage.ErrorResponse expected = new NettyMessage.ErrorResponse(expectedError, receiverId);
+				NettyMessage.ErrorResponse actual = encodeAndDecode(expected);
+
+				assertEquals(expected.error.getClass(), actual.error.getClass());
+				assertEquals(expected.error.getMessage(), actual.error.getMessage());
+				assertEquals(receiverId, actual.receiverId);
+			}
+
+			{
+				IllegalStateException expectedError = new IllegalStateException("Illegal illegal illegal");
+
+				NettyMessage.ErrorResponse expected = new NettyMessage.ErrorResponse(expectedError);
+				NettyMessage.ErrorResponse actual = encodeAndDecode(expected);
+
+				assertEquals(expected.error.getClass(), actual.error.getClass());
+				assertEquals(expected.error.getMessage(), actual.error.getMessage());
+				assertNull(actual.receiverId);
+				assertTrue(actual.isFatalError());
+			}
+		}
+
+		{
+			NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ExecutionAttemptID(), new IntermediateResultPartitionID(), random.nextInt(), new InputChannelID());
+			NettyMessage.PartitionRequest actual = encodeAndDecode(expected);
+
+			assertEquals(expected.producerExecutionId, actual.producerExecutionId);
+			assertEquals(expected.partitionId, actual.partitionId);
+			assertEquals(expected.queueIndex, actual.queueIndex);
+			assertEquals(expected.receiverId, actual.receiverId);
+		}
+
+		{
+			NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ExecutionAttemptID(), new IntermediateResultPartitionID(), new InputChannelID());
+			NettyMessage.TaskEventRequest actual = encodeAndDecode(expected);
+
+			assertEquals(expected.executionId, actual.executionId);
+			assertEquals(expected.event, actual.event);
+			assertEquals(expected.partitionId, actual.partitionId);
+			assertEquals(expected.receiverId, actual.receiverId);
+		}
+	}
+
+	private <T extends NettyMessage> T encodeAndDecode(T msg) {
+		channel.writeOutbound(msg);
+		ByteBuf encoded = (ByteBuf) channel.readOutbound();
+
+		channel.writeInbound(encoded);
+
+
+		return (T) channel.readInbound();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
deleted file mode 100644
index 09c285b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.embedded.EmbeddedChannel;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class OutboundEnvelopeEncoderTest {
-
-	private final long RANDOM_SEED = 520346508276087l;
-
-	private final Random random = new Random(RANDOM_SEED);
-
-	private static final int NUM_RANDOM_ENVELOPES = 512;
-
-	private static final int MAX_EVENTS_SIZE = 1024;
-
-	private static final int MAX_BUFFER_SIZE = 32768;
-
-	@Test
-	public void testEncodedSizeAndBufferRecycling() {
-		final ByteBuffer events = ByteBuffer.allocate(MAX_EVENTS_SIZE);
-		final MemorySegment segment = new MemorySegment(new byte[MAX_BUFFER_SIZE]);
-
-		final Buffer buffer = mock(Buffer.class);
-		when(buffer.getMemorySegment()).thenReturn(segment);
-
-		final EmbeddedChannel channel = new EmbeddedChannel(new OutboundEnvelopeEncoder());
-
-		int numBuffers = 0;
-		for (int i = 0; i < NUM_RANDOM_ENVELOPES; i++) {
-			Envelope env = new Envelope(i, new JobID(), new ChannelID());
-			int expectedEncodedMsgSize = OutboundEnvelopeEncoder.HEADER_SIZE;
-
-			if (random.nextBoolean()) {
-				int eventsSize = random.nextInt(MAX_EVENTS_SIZE + 1);
-				expectedEncodedMsgSize += eventsSize;
-
-				events.clear();
-				events.limit(eventsSize);
-
-				env.setEventsSerialized(events);
-			}
-
-			if (random.nextBoolean()) {
-				numBuffers++;
-
-				int bufferSize = random.nextInt(MAX_BUFFER_SIZE + 1);
-				when(buffer.size()).thenReturn(bufferSize);
-				env.setBuffer(buffer);
-
-				expectedEncodedMsgSize += bufferSize;
-			}
-
-			Assert.assertTrue(channel.writeOutbound(env));
-
-			// --------------------------------------------------------------------
-			// verify encoded ByteBuf size
-			// --------------------------------------------------------------------
-			ByteBuf encodedMsg = (ByteBuf) channel.readOutbound();
-			Assert.assertEquals(expectedEncodedMsgSize, encodedMsg.readableBytes());
-
-			encodedMsg.release();
-		}
-
-		// --------------------------------------------------------------------
-		// verify buffers are recycled
-		// --------------------------------------------------------------------
-		verify(buffer, times(numBuffers)).recycleBuffer();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockConsumer.java
new file mode 100644
index 0000000..79494aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockConsumer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MockConsumer implements Callable<Boolean> {
+
+	private static final int SLEEP_TIME_MS = 20;
+
+	private final IntermediateResultPartitionQueueIterator iterator;
+
+	private final boolean slowConsumer;
+
+	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+	public MockConsumer(IntermediateResultPartitionQueueIterator iterator, boolean slowConsumer) {
+		this.iterator = iterator;
+		this.slowConsumer = slowConsumer;
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+		MockNotificationListener listener = new MockNotificationListener();
+
+		int currentNumber = 0;
+
+		try {
+			while (true) {
+				Buffer buffer = iterator.getNextBuffer();
+
+				if (slowConsumer) {
+					Thread.sleep(SLEEP_TIME_MS);
+				}
+
+				if (buffer == null) {
+					if (iterator.subscribe(listener)) {
+						listener.waitForNotification();
+					}
+					else if (iterator.isConsumed()) {
+						break;
+					}
+				}
+				else {
+					try {
+						if (buffer.isBuffer()) {
+							currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+						}
+					}
+					finally {
+						buffer.recycle();
+					}
+				}
+			}
+		}
+		catch (Throwable t) {
+			error.compareAndSet(null, t);
+			return false;
+		}
+
+		return true;
+	}
+
+	public Throwable getError() {
+		return error.get();
+	}
+
+	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		for (int i = 4; i < segment.size(); i += 4) {
+			if (segment.getInt(i) != currentNumber++) {
+				throw new IllegalStateException("Read unexpected number from buffer.");
+			}
+		}
+
+		return currentNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockNotificationListener.java
new file mode 100644
index 0000000..928ac51
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockNotificationListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockNotificationListener implements NotificationListener {
+
+	final AtomicInteger numNotifications = new AtomicInteger();
+
+	@Override
+	public void onNotification() {
+		synchronized (numNotifications) {
+			numNotifications.incrementAndGet();
+
+			numNotifications.notifyAll();
+		}
+	}
+
+	public void waitForNotification() throws InterruptedException {
+
+		int current = numNotifications.get();
+
+		synchronized (numNotifications) {
+			while (current == numNotifications.get()) {
+				numNotifications.wait();
+			}
+		}
+	}
+
+	public int getNumberOfNotifications() {
+		return numNotifications.get();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
new file mode 100644
index 0000000..1cfe75d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MockProducer implements Callable<Boolean> {
+
+	private static final int SLEEP_TIME_MS = 20;
+
+	private final IntermediateResultPartitionQueue queue;
+
+	private final BufferPool bufferPool;
+
+	private final int numBuffersToProduce;
+
+	private final boolean slowProducer;
+
+	private final AtomicInteger discardAfter = new AtomicInteger(Integer.MAX_VALUE);
+
+	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+	public MockProducer(IntermediateResultPartitionQueue queue, BufferPool bufferPool, int numBuffersToProduce, boolean slowProducer) {
+		this.queue = queue;
+		this.bufferPool = bufferPool;
+		this.numBuffersToProduce = numBuffersToProduce;
+		this.slowProducer = slowProducer;
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+		try {
+			int currentNumber = 0;
+
+			for (int i = 0; i < numBuffersToProduce; i++) {
+				if (i >= discardAfter.get()) {
+					queue.discard();
+					return true;
+				}
+
+				Buffer buffer = bufferPool.requestBufferBlocking();
+
+				currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+				queue.add(buffer);
+
+				if (slowProducer) {
+					Thread.sleep(SLEEP_TIME_MS);
+				}
+			}
+
+			queue.finish();
+		}
+		catch (Throwable t) {
+			error.compareAndSet(null, t);
+			return false;
+		}
+
+		return true;
+	}
+
+	void discard() {
+		discardAfter.set(0);
+	}
+
+	public void discardAfter(int numBuffers) {
+		discardAfter.set(numBuffers);
+	}
+
+	public Throwable getError() {
+		return error.get();
+	}
+
+	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		for (int i = 4; i < segment.size(); i += 4) {
+			segment.putInt(i, currentNumber++);
+		}
+
+		return currentNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
new file mode 100644
index 0000000..cff03ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.queue;
+
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.MockConsumer;
+import org.apache.flink.runtime.io.network.partition.MockNotificationListener;
+import org.apache.flink.runtime.io.network.partition.MockProducer;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator.AlreadySubscribedException;
+import org.apache.flink.runtime.util.event.NotificationListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PipelinedPartitionQueueTest {
+
+	private static final int NUM_BUFFERS = 1024;
+
+	private static final int BUFFER_SIZE = 32 * 1024;
+
+	private static final NetworkBufferPool networkBuffers = new NetworkBufferPool(NUM_BUFFERS, BUFFER_SIZE);
+
+	private PipelinedPartitionQueue queue;
+
+	@Before
+	public void setup() {
+		this.queue = new PipelinedPartitionQueue();
+	}
+
+	@Test(expected = IllegalQueueIteratorRequestException.class)
+	public void testExceptionWhenMultipleConsumers() throws IOException {
+		queue.getQueueIterator(Optional.<BufferProvider>absent());
+
+		// This queue is only consumable once, so this should throw an Exception
+		queue.getQueueIterator(Optional.<BufferProvider>absent());
+	}
+
+	@Test(expected = AlreadySubscribedException.class)
+	public void testExceptionWhenMultipleSubscribers() throws IOException {
+		IntermediateResultPartitionQueueIterator iterator = queue.getQueueIterator(Optional.<BufferProvider>absent());
+
+		NotificationListener listener = mock(NotificationListener.class);
+
+		// First subscribe should be fine
+		assertTrue(iterator.subscribe(listener));
+
+		// This should throw an already subscribed exception
+		iterator.subscribe(listener);
+	}
+
+	@Test
+	public void testProduceConsume() throws Exception {
+		Buffer boe = mock(Buffer.class);
+
+		MockNotificationListener listener = new MockNotificationListener();
+
+		IntermediateResultPartitionQueueIterator iterator = queue.getQueueIterator(Optional.<BufferProvider>absent());
+
+		// Empty queue => should return null
+		assertNull(iterator.getNextBuffer());
+
+		// But iterator should not be consumed yet...
+		assertFalse(iterator.isConsumed());
+
+		// Subscribe for notifications
+		assertTrue(iterator.subscribe(listener));
+
+		assertEquals(0, listener.getNumberOfNotifications());
+
+		// Add data to the queue...
+		queue.add(boe);
+
+		// ...should result in a notification
+		assertEquals(1, listener.getNumberOfNotifications());
+
+		// ...and one available result
+		assertNotNull(iterator.getNextBuffer());
+		assertNull(iterator.getNextBuffer());
+		assertFalse(iterator.isConsumed());
+
+		// Add data to the queue...
+		queue.add(boe);
+		// ...don't allow to subscribe, if data is available
+		assertFalse(iterator.subscribe(listener));
+
+		assertEquals(1, listener.getNumberOfNotifications());
+	}
+
+	@Test
+	public void testDiscardingProduceWhileSubscribedConsumer() throws IOException {
+		IntermediateResultPartitionQueueIterator iterator = queue.getQueueIterator(Optional.<BufferProvider>absent());
+
+		NotificationListener listener = mock(NotificationListener.class);
+
+		assertTrue(iterator.subscribe(listener));
+
+		queue.discard();
+
+		verify(listener, times(1)).onNotification();
+
+		assertTrue(iterator.isConsumed());
+	}
+
+	@Test
+	public void testConcurrentProduceConsume() throws Exception {
+		doTestConcurrentProduceConsume(false, false);
+	}
+
+	@Test
+	public void testConcurrentSlowProduceConsume() throws Exception {
+		doTestConcurrentProduceConsume(true, false);
+	}
+
+	@Test
+	public void testConcurrentProduceSlowConsume() throws Exception {
+		doTestConcurrentProduceConsume(true, false);
+	}
+
+	@Test
+	public void testConcurrentDiscardingProduceConsume() throws Exception {
+		doTestConcurrentProduceConsume(false, false, true);
+	}
+
+	@Test
+	public void testConcurrentDiscardingSlowProduceConsume() throws Exception {
+		doTestConcurrentProduceConsume(true, false, true);
+	}
+
+	@Test
+	public void testConcurrentDiscardingProduceSlowConsume() throws Exception {
+		doTestConcurrentProduceConsume(false, true, true);
+	}
+
+	private void doTestConcurrentProduceConsume(boolean slowProducer, boolean slowConsumer) throws Exception {
+		doTestConcurrentProduceConsume(slowProducer, slowConsumer, false);
+	}
+
+	private void doTestConcurrentProduceConsume(boolean slowProducer, boolean slowConsumer, boolean discardProduce) throws Exception {
+
+		final int bufferPoolSize = 8;
+
+		final int numBuffersToProduce = 64;
+
+		BufferPool producerBufferPool = networkBuffers.createBufferPool(bufferPoolSize, true);
+
+		MockProducer producer = new MockProducer(queue, producerBufferPool, numBuffersToProduce, slowProducer);
+
+		if (discardProduce) {
+			producer.discardAfter(new Random().nextInt(numBuffersToProduce));
+		}
+
+		MockConsumer consumer = new MockConsumer(queue.getQueueIterator(Optional.<BufferProvider>absent()), slowConsumer);
+
+		ExecutorService executorService = Executors.newCachedThreadPool();
+
+		try {
+			Future<Boolean> producerSuccess = executorService.submit(producer);
+			Future<Boolean> consumerSuccess = executorService.submit(consumer);
+
+			boolean success = false;
+			try {
+				success = producerSuccess.get(5, TimeUnit.SECONDS);
+				success &= consumerSuccess.get(5, TimeUnit.SECONDS);
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+
+				if (producer.getError() != null) {
+					System.err.println("Producer error:");
+					producer.getError().printStackTrace();
+				}
+
+				if (consumer.getError() != null) {
+					System.err.println("Consumer error:");
+					consumer.getError().printStackTrace();
+				}
+
+				fail("Unexpected failure during test: " + t.getMessage() + ". Producer error: " + producer.getError() + ", consumer error: " + consumer.getError());
+			}
+
+			producerBufferPool.destroy();
+
+			assertTrue(success);
+		} finally {
+			executorService.shutdownNow();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/DataInputOutputSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/DataInputOutputSerializerTest.java
deleted file mode 100644
index de294e2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/DataInputOutputSerializerTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-
-public class DataInputOutputSerializerTest {
-
-	@Test
-	public void testWrapAsByteBuffer() {
-		SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
-
-		DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
-		MemorySegment segment = new MemorySegment(new byte[randomInt.length()]);
-
-		try {
-			// empty buffer, read buffer should be empty
-			ByteBuffer wrapper = serializer.wrapAsByteBuffer();
-
-			Assert.assertEquals(0, wrapper.position());
-			Assert.assertEquals(0, wrapper.limit());
-
-			// write to data output, read buffer should still be empty
-			randomInt.write(serializer);
-
-			Assert.assertEquals(0, wrapper.position());
-			Assert.assertEquals(0, wrapper.limit());
-
-			// get updated read buffer, read buffer should contain written data
-			wrapper = serializer.wrapAsByteBuffer();
-
-			Assert.assertEquals(0, wrapper.position());
-			Assert.assertEquals(randomInt.length(), wrapper.limit());
-
-			// clear data output, read buffer should still contain written data
-			serializer.clear();
-
-			Assert.assertEquals(0, wrapper.position());
-			Assert.assertEquals(randomInt.length(), wrapper.limit());
-
-			// get updated read buffer, should be empty
-			wrapper = serializer.wrapAsByteBuffer();
-
-			Assert.assertEquals(0, wrapper.position());
-			Assert.assertEquals(0, wrapper.limit());
-
-			// write to data output and read back to memory
-			randomInt.write(serializer);
-			wrapper = serializer.wrapAsByteBuffer();
-
-			segment.put(0, wrapper, randomInt.length());
-
-			Assert.assertEquals(randomInt.length(), wrapper.position());
-			Assert.assertEquals(randomInt.length(), wrapper.limit());
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testRandomValuesWriteRead() {
-		final int numElements = 100000;
-		final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>();
-
-		DataOutputSerializer serializer = new DataOutputSerializer(1);
-
-		for (SerializationTestType value : Util.randomRecords(numElements)) {
-			reference.add(value);
-
-			try {
-				value.write(serializer);
-			} catch (IOException e) {
-				e.printStackTrace();
-				Assert.fail("Test encountered an unexpected exception.");
-			}
-		}
-
-		DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer());
-
-		for (SerializationTestType expected : reference) {
-			try {
-				SerializationTestType actual = expected.getClass().newInstance();
-				actual.read(deserializer);
-
-				Assert.assertEquals(expected, actual);
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail("Test encountered an unexpected exception.");
-			}
-		}
-
-		reference.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/PagedViewsTest.java
deleted file mode 100644
index c9ca202..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/PagedViewsTest.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
-import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
-import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
-import org.junit.Test;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.*;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class PagedViewsTest {
-
-	@Test
-	public void testSequenceOfIntegersWithAlignedBuffers() {
-		try {
-			final int NUM_INTS = 1000000;
-
-			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2048);
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testSequenceOfIntegersWithUnalignedBuffers() {
-		try {
-			final int NUM_INTS = 1000000;
-
-			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2047);
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testRandomTypes() {
-		try {
-			final int NUM_TYPES = 100000;
-
-			// test with an odd buffer size to force many unaligned cases
-			testSequenceOfTypes(Util.randomRecords(NUM_TYPES), 57);
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-
-	@Test
-	public void testReadFully() {
-		int bufferSize = 100;
-		byte[] expected = new byte[bufferSize];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(bufferSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[bufferSize];
-
-		try {
-			inputView.readFully(buffer);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize);
-		assertArrayEquals(expected, buffer);
-	}
-
-	@Test
-	public void testReadFullyAcrossSegments() {
-		int bufferSize = 100;
-		int segmentSize = 30;
-		byte[] expected = new byte[bufferSize];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(segmentSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[bufferSize];
-
-		try {
-			inputView.readFully(buffer);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
-		assertArrayEquals(expected, buffer);
-	}
-
-	@Test
-	public void testReadAcrossSegments() {
-		int bufferSize = 100;
-		int bytes2Write = 75;
-		int segmentSize = 30;
-		byte[] expected = new byte[bytes2Write];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(segmentSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[bufferSize];
-		int bytesRead = 0;
-
-		try {
-			bytesRead = inputView.read(buffer);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(bytes2Write, bytesRead);
-		assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
-
-		byte[] tempBuffer = new byte[bytesRead];
-		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
-		assertArrayEquals(expected, tempBuffer);
-	}
-
-	@Test
-	public void testEmptyingInputView() {
-		int bufferSize = 100;
-		int bytes2Write = 75;
-		int segmentSize = 30;
-		byte[] expected = new byte[bytes2Write];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(segmentSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[bufferSize];
-		int bytesRead = 0;
-
-		try {
-			bytesRead = inputView.read(buffer);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(bytes2Write, bytesRead);
-
-		byte[] tempBuffer = new byte[bytesRead];
-		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
-		assertArrayEquals(expected, tempBuffer);
-
-		try{
-			bytesRead = inputView.read(buffer);
-		}catch(IOException e){
-			e.printStackTrace();
-			fail("Unexpected exception: Input view should be empty and thus return -1.");
-		}
-
-		assertEquals(-1, bytesRead);
-		assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
-	}
-
-	@Test
-	public void testReadFullyWithNotEnoughData() {
-		int bufferSize = 100;
-		int bytes2Write = 99;
-		int segmentSize = 30;
-		byte[] expected = new byte[bytes2Write];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(segmentSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[bufferSize];
-		boolean eofException = false;
-
-		try {
-			inputView.readFully(buffer);
-		}catch(EOFException e){
-			//Expected exception
-			eofException = true;
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertTrue("EOFException should have occurred.", eofException);
-
-		int bytesRead = 0;
-
-		try{
-			bytesRead =inputView.read(buffer);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(-1, bytesRead);
-	}
-
-	@Test
-	public void testReadFullyWithOffset(){
-		int bufferSize = 100;
-		int segmentSize = 30;
-		byte[] expected = new byte[bufferSize];
-		new Random().nextBytes(expected);
-
-		TestOutputView outputView = new TestOutputView(segmentSize);
-
-		try {
-			outputView.write(expected);
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not write to TestOutputView.");
-		}
-
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[2*bufferSize];
-
-		try {
-			inputView.readFully(buffer, bufferSize, bufferSize);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
-		byte[] tempBuffer = new byte[bufferSize];
-		System.arraycopy(buffer, bufferSize, tempBuffer,0, bufferSize);
-		assertArrayEquals(expected, tempBuffer);
-	}
-
-	@Test
-	public void testReadFullyEmptyView(){
-		int segmentSize = 30;
-		TestOutputView outputView = new TestOutputView(segmentSize);
-		outputView.close();
-
-		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[segmentSize];
-		boolean eofException = false;
-
-		try{
-			inputView.readFully(buffer);
-		}catch(EOFException e){
-			//expected Exception
-			eofException = true;
-		}catch(Exception e){
-			e.printStackTrace();
-			fail("Unexpected exception: Could not read TestInputView.");
-		}
-
-		assertTrue("EOFException expected.", eofException);
-	}
-
-
-	private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
-
-		List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
-		TestOutputView outView = new TestOutputView(segmentSize);
-
-		// write
-		for (SerializationTestType type : sequence) {
-			// serialize the record
-			type.write(outView);
-			elements.add(type);
-		}
-		outView.close();
-
-		// check the records
-		TestInputView inView = new TestInputView(outView.segments);
-
-		for (SerializationTestType reference : elements) {
-			SerializationTestType result = reference.getClass().newInstance();
-			result.read(inView);
-			assertEquals(reference, result);
-		}
-	}
-
-	// ============================================================================================
-
-	private static final class SegmentWithPosition {
-
-		private final MemorySegment segment;
-		private final int position;
-
-		public SegmentWithPosition(MemorySegment segment, int position) {
-			this.segment = segment;
-			this.position = position;
-		}
-	}
-
-	private static final class TestOutputView extends AbstractPagedOutputView {
-
-		private final List<SegmentWithPosition> segments = new ArrayList<SegmentWithPosition>();
-
-		private final int segmentSize;
-
-		private TestOutputView(int segmentSize) {
-			super(new MemorySegment(new byte[segmentSize]), segmentSize, 0);
-
-			this.segmentSize = segmentSize;
-		}
-
-		@Override
-		protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
-			segments.add(new SegmentWithPosition(current, positionInCurrent));
-			return new MemorySegment(new byte[segmentSize]);
-		}
-
-		public void close() {
-			segments.add(new SegmentWithPosition(getCurrentSegment(), getCurrentPositionInSegment()));
-		}
-	}
-
-	private static final class TestInputView extends AbstractPagedInputView {
-
-		private final List<SegmentWithPosition> segments;
-
-		private int num;
-
-
-		private TestInputView(List<SegmentWithPosition> segments) {
-			super(segments.get(0).segment, segments.get(0).position, 0);
-
-			this.segments = segments;
-			this.num = 0;
-		}
-
-		@Override
-		protected MemorySegment nextSegment(MemorySegment current) throws IOException {
-			num++;
-			if (num < segments.size()) {
-				return segments.get(num).segment;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		protected int getLimitForSegment(MemorySegment segment) {
-			return segments.get(num).position;
-		}
-	}
-}


Mime
View raw message