flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [32/34] git commit: Rename BufferAvailabilityRegistration constants and add initial LocalBufferPoolTest
Date Tue, 10 Jun 2014 19:35:29 GMT
Rename BufferAvailabilityRegistration constants and add initial LocalBufferPoolTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/99730549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/99730549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/99730549

Branch: refs/heads/release-0.5.1
Commit: 997305496904f3619734316260bf638139237326
Parents: ef8e2b3
Author: uce <u.celebi@fu-berlin.de>
Authored: Tue Jun 3 14:59:16 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Jun 10 21:26:53 2014 +0200

----------------------------------------------------------------------
 .../network/bufferprovider/BufferProvider.java  |  16 +-
 .../bufferprovider/DiscardBufferPool.java       |   2 +-
 .../network/bufferprovider/LocalBufferPool.java |  12 +-
 .../network/netty/InboundEnvelopeDecoder.java   |   6 +-
 .../pact/runtime/test/util/MockEnvironment.java |   2 +-
 .../bufferprovider/LocalBufferPoolTest.java     | 360 +++++++++++++++++++
 .../netty/InboundEnvelopeDecoderTest.java       |  16 +-
 7 files changed, 381 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
index d82b427..25c8a7e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
@@ -68,18 +68,8 @@ public interface BufferProvider {
 	BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener
listener);
 
 	public enum BufferAvailabilityRegistration {
-		NOT_REGISTERED_BUFFER_AVAILABLE(false),
-		NOT_REGISTERED_BUFFER_POOL_DESTROYED(false),
-		REGISTERED(true);
-
-		private final boolean isSuccessful;
-
-		private BufferAvailabilityRegistration(boolean isSuccessful) {
-			this.isSuccessful = isSuccessful;
-		}
-
-		public boolean isSuccessful() {
-			return isSuccessful;
-		}
+		SUCCEEDED_REGISTERED(),
+		FAILED_BUFFER_AVAILABLE(),
+		FAILED_BUFFER_POOL_DESTROYED()
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
index 5daa509..9b3d434 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
@@ -41,7 +41,7 @@ public final class DiscardBufferPool implements BufferProvider, BufferRecycler
{
 
 	@Override
 	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener
listener) {
-		return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+		return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
index 6285726..933df76 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
@@ -187,17 +187,17 @@ public final class LocalBufferPool implements BufferProvider {
 	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener
listener) {
 		synchronized (this.buffers) {
 			if (!this.buffers.isEmpty()) {
-				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
+				return BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE;
 			}
 
 			if (this.isDestroyed) {
-				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+				return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
 			}
 
 			this.listeners.add(listener);
 		}
 
-		return BufferAvailabilityRegistration.REGISTERED;
+		return BufferAvailabilityRegistration.SUCCEEDED_REGISTERED;
 	}
 
 	/**
@@ -300,10 +300,7 @@ public final class LocalBufferPool implements BufferProvider {
 					this.globalBufferPool.returnBuffer(buffer);
 					this.numRequestedBuffers--;
 
-					return;
-				}
-
-				if (!this.listeners.isEmpty()) {
+				} else if (!this.listeners.isEmpty()) {
 					Buffer availableBuffer = new Buffer(buffer, buffer.size(), this.recycler);
 					try {
 						this.listeners.poll().bufferAvailable(availableBuffer);
@@ -311,6 +308,7 @@ public final class LocalBufferPool implements BufferProvider {
 						this.buffers.add(buffer);
 						this.buffers.notify();
 					}
+
 				} else {
 					this.buffers.add(buffer);
 					this.buffers.notify();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
index 54f4617..1135577 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -124,7 +124,7 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter
impleme
 			}
 			else if (decoderState == DecoderState.NO_BUFFER_AVAILABLE) {
 				switch (this.currentBufferProvider.registerBufferAvailabilityListener(this)) {
-					case REGISTERED:
+					case SUCCEEDED_REGISTERED:
 						if (ctx.channel().config().isAutoRead()) {
 							ctx.channel().config().setAutoRead(false);
 
@@ -137,10 +137,10 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter
impleme
 						this.stagedBuffer.retain();
 						return false;
 
-					case NOT_REGISTERED_BUFFER_AVAILABLE:
+					case FAILED_BUFFER_AVAILABLE:
 						continue;
 
-					case NOT_REGISTERED_BUFFER_POOL_DESTROYED:
+					case FAILED_BUFFER_POOL_DESTROYED:
 						this.bytesToSkip = skipBytes(in, this.currentBufferRequestSize);
 
 						this.currentBufferRequestSize = 0;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index b715a4e..2585a74 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -127,7 +127,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	@Override
 	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener
listener) {
-		return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+		return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
new file mode 100644
index 0000000..c7d8d41
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
@@ -0,0 +1,360 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class LocalBufferPoolTest {
+
+	private final static int NUM_BUFFERS = 2048;
+
+	private final static int BUFFER_SIZE = 1024;
+
+	private final static GlobalBufferPool GLOBAL_BUFFER_POOL = new GlobalBufferPool(NUM_BUFFERS,
BUFFER_SIZE);
+
+	private final static RecyclingBufferAvailableAnswer RECYCLING_BUFFER_AVAILABLE_ANSWER =
new RecyclingBufferAvailableAnswer();
+
+	@BeforeClass
+	public static void setupGlobalBufferPoolOnce() {
+		Assert.assertEquals("GlobalBufferPool does not have required number of buffers.",
+				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numBuffers());
+		Assert.assertEquals("GlobalBufferPool does not have required number of available buffers.",
+				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numAvailableBuffers());
+	}
+
+	@After
+	public void verifyAllBuffersReturnedToGlobalBufferPool() {
+		Assert.assertEquals("Did not return all buffers to GlobalBufferPool after test.",
+				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numAvailableBuffers());
+	}
+
+	@Test
+	public void testSingleConsumerNonBlockingRequestAndRecycle() throws IOException {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		Assert.assertEquals(0, bufferPool.numRequestedBuffers());
+
+		// this request-recycle cycle should only take a single buffer out of
+		// the GlobalBufferPool as it is recycled over and over again
+		for (int numRequested = 0; numRequested < NUM_BUFFERS; numRequested++) {
+			Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE);
+
+			Assert.assertEquals(BUFFER_SIZE, buffer.size());
+
+			Assert.assertEquals("Expected single buffer request in buffer pool.",
+					1, bufferPool.numRequestedBuffers());
+			Assert.assertEquals("Expected no available buffer in buffer pool.",
+					0, bufferPool.numAvailableBuffers());
+
+			buffer.recycleBuffer();
+
+			Assert.assertEquals("Expected single available buffer after recycle.",
+					1, bufferPool.numAvailableBuffers());
+		}
+
+		bufferPool.destroy();
+	}
+
+	@Test
+	public void testSingleConsumerNonBlockingRequestMoreThanAvailable() throws IOException {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		Assert.assertEquals(0, bufferPool.numRequestedBuffers());
+
+		// request all buffers from the buffer pool
+		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
+		for (int i = 0; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		Assert.assertEquals("Expected no available buffer in buffer pool.",
+				0, bufferPool.numAvailableBuffers());
+
+		Assert.assertNull("Expected null return value for buffer request with no available buffer.",
+				bufferPool.requestBuffer(BUFFER_SIZE));
+
+		// recycle all buffers and destroy buffer pool
+		for (Buffer buffer : requestedBuffers) {
+			buffer.recycleBuffer();
+		}
+
+		bufferPool.destroy();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testSingleConsumerNonBlockingRequestTooLarge() throws IOException {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		// request too large buffer for the pool
+		bufferPool.requestBuffer(BUFFER_SIZE * 2);
+	}
+
+	@Test
+	public void testSingleConsumerNonBlockingRequestSmall() throws IOException {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		// request smaller buffer and verify size
+		Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE / 2);
+
+		Assert.assertEquals(BUFFER_SIZE / 2, buffer.size());
+
+		buffer.recycleBuffer();
+
+		bufferPool.destroy();
+	}
+
+	@Test
+	public void testSingleConsumerBlockingRequest() throws Exception {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		final Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
+		for (int i = 0; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		final Buffer[] bufferFromBlockingRequest = new Buffer[1];
+
+		// --------------------------------------------------------------------
+		// 1. blocking call: interrupt thread
+		// --------------------------------------------------------------------
+		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
+
+		Thread blockingBufferRequestThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					bufferFromBlockingRequest[0] = bufferPool.requestBufferBlocking(BUFFER_SIZE);
+					Assert.fail("Unexpected return from blocking buffer request.");
+				} catch (IOException e) {
+					Assert.fail("Unexpected IOException during test.");
+				} catch (InterruptedException e) {
+					// expected interruption
+				}
+			}
+		});
+
+		// start blocking request thread, sleep, interrupt blocking request thread
+		blockingBufferRequestThread.start();
+
+		Thread.sleep(500);
+
+		blockingBufferRequestThread.interrupt();
+
+		Assert.assertNull(bufferFromBlockingRequest[0]);
+		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
+
+		// --------------------------------------------------------------------
+		// 2. blocking call: recycle buffer in different thread
+		// --------------------------------------------------------------------
+		// recycle the buffer soon
+		new Timer().schedule(new TimerTask() {
+			@Override
+			public void run() {
+				requestedBuffers[0].recycleBuffer();
+			}
+		}, 500);
+
+		//
+		try {
+			Buffer buffer = bufferPool.requestBufferBlocking(BUFFER_SIZE);
+			Assert.assertNotNull(buffer);
+
+			buffer.recycleBuffer();
+		} catch (InterruptedException e) {
+			Assert.fail("Unexpected InterruptedException during test.");
+		}
+
+		// recycle remaining buffers
+		for (int i = 1; i < requestedBuffers.length; i++) {
+			requestedBuffers[i].recycleBuffer();
+		}
+
+		bufferPool.destroy();
+	}
+
+	@Test
+	public void testSingleConsumerRecycleAfterDestroy() throws IOException {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
+		for (int i = 0; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		bufferPool.destroy();
+
+		// recycle should return buffers to GlobalBufferPool
+		// => verified in verifyAllBuffersReturned()
+		for (Buffer buffer : requestedBuffers) {
+			buffer.recycleBuffer();
+		}
+	}
+
+	@Test
+	public void testSingleConsumerBufferAvailabilityListenerRegistration() throws Exception
{
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+
+		// recycle buffer when listener mock is called back
+		doAnswer(RECYCLING_BUFFER_AVAILABLE_ANSWER).when(listener).bufferAvailable(Matchers.<Buffer>anyObject());
+
+		// request all buffers of the pool
+		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
+		for (int i = 0; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		BufferAvailabilityRegistration registration;
+		// --------------------------------------------------------------------
+		// 1. success
+		// --------------------------------------------------------------------
+		registration = bufferPool.registerBufferAvailabilityListener(listener);
+		Assert.assertEquals(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED, registration);
+
+		// verify call to buffer listener after recycle
+		requestedBuffers[0].recycleBuffer();
+		verify(listener, times(1)).bufferAvailable(Matchers.<Buffer>anyObject());
+
+		Assert.assertEquals("Expected single available buffer after recycle call in mock listener.",
+				1, bufferPool.numAvailableBuffers());
+
+		// --------------------------------------------------------------------
+		// 2. failure: buffer is available
+		// --------------------------------------------------------------------
+		registration = bufferPool.registerBufferAvailabilityListener(listener);
+		Assert.assertEquals(BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE, registration);
+
+		Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE);
+		Assert.assertNotNull(buffer);
+
+		buffer.recycleBuffer();
+
+		// --------------------------------------------------------------------
+		// 3. failure: buffer pool destroyed
+		// --------------------------------------------------------------------
+		bufferPool.destroy();
+
+		registration = bufferPool.registerBufferAvailabilityListener(listener);
+		Assert.assertEquals(BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED, registration);
+
+		// recycle remaining buffers
+		for (int i = 1; i < requestedBuffers.length; i++) {
+			requestedBuffers[i].recycleBuffer();
+		}
+	}
+
+	@Test
+	public void testSingleConsumerReturnExcessBuffers() throws Exception {
+		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
+
+		// request all buffers of the pool
+		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
+		for (int i = 0; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
+
+		// recycle first half of the buffers
+		// => leave requested number of buffers unchanged
+		// => increase available number of buffers
+		for (int i = 0; i < NUM_BUFFERS / 2; i++) {
+			requestedBuffers[i].recycleBuffer();
+		}
+
+		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(NUM_BUFFERS / 2, bufferPool.numAvailableBuffers());
+
+		// reduce designated number of buffers
+		// => available buffers (1/2th) should be returned immediately
+		// => non-available buffers (1/4th) should be returned later
+		bufferPool.setNumDesignatedBuffers((NUM_BUFFERS / 2) - (NUM_BUFFERS / 4));
+
+		Assert.assertEquals(NUM_BUFFERS / 2, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
+
+		// recycle second half of the buffers
+		// => previously non-available buffers (1/4th) should be returned immediately
+		// => remaining buffers are the available ones (1/4th)
+		for (int i = NUM_BUFFERS / 2; i < NUM_BUFFERS; i++) {
+			requestedBuffers[i].recycleBuffer();
+		}
+
+		Assert.assertEquals("Expected current number of requested buffers to be equal to the number
of designated buffers.",
+				bufferPool.numDesignatedBuffers(), bufferPool.numRequestedBuffers());
+
+		Assert.assertEquals("Expected current number of requested and available buffers to be equal,
" +
+				"because all requested buffers have been recycled and become available again.",
+				bufferPool.numRequestedBuffers(), bufferPool.numAvailableBuffers());
+
+		// re-request remaining buffers and register buffer availability listener
+		int remaining = bufferPool.numRequestedBuffers();
+		for (int i = 0; i < remaining; i++) {
+			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
+		}
+
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		doAnswer(RECYCLING_BUFFER_AVAILABLE_ANSWER).when(listener).bufferAvailable(Matchers.<Buffer>anyObject());
+
+		BufferAvailabilityRegistration registration = bufferPool.registerBufferAvailabilityListener(listener);
+		Assert.assertEquals(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED, registration);
+
+		// reduce number of designated buffers and recycle all buffers
+		bufferPool.setNumDesignatedBuffers(bufferPool.numDesignatedBuffers() - 1);
+
+		for (int i = 0; i < remaining; i++) {
+			requestedBuffers[i].recycleBuffer();
+		}
+
+		Assert.assertEquals(remaining - 1, bufferPool.numRequestedBuffers());
+		Assert.assertEquals(remaining - 1, bufferPool.numAvailableBuffers());
+
+		bufferPool.destroy();
+	}
+
+	// --------------------------------------------------------------------
+
+	private static class RecyclingBufferAvailableAnswer implements Answer<Void> {
+
+		@Override
+		public Void answer(InvocationOnMock invocation) throws Throwable {
+			Buffer buffer = (Buffer) invocation.getArguments()[0];
+			buffer.recycleBuffer();
+
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/99730549/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
index 40aefd3..f695979 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -83,7 +83,7 @@ public class InboundEnvelopeDecoderTest {
 		ByteBuf buf = encode(ch, envelopes);
 
 		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.REGISTERED);
+				.thenReturn(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED);
 
 		Buffer buffer = allocBuffer(envelopes[2].getBuffer().size());
 
@@ -161,7 +161,7 @@ public class InboundEnvelopeDecoderTest {
 				.thenReturn(null);
 
 		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.REGISTERED);
+				.thenReturn(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED);
 
 		// --------------------------------------------------------------------
 
@@ -196,7 +196,7 @@ public class InboundEnvelopeDecoderTest {
 		Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope()};
 
 		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE);
+				.thenReturn(BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE);
 
 		when(this.bufferProvider.requestBuffer(anyInt()))
 				.thenReturn(null)
@@ -223,7 +223,7 @@ public class InboundEnvelopeDecoderTest {
 				.thenReturn(null);
 
 		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
-				.thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED);
+				.thenReturn(BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED);
 
 		// --------------------------------------------------------------------
 
@@ -738,15 +738,15 @@ public class InboundEnvelopeDecoderTest {
 		public BufferAvailabilityRegistration answer(InvocationOnMock invocation) throws Throwable
{
 			if (this.random.nextBoolean()) {
 				this.isRegistered = true;
-				return BufferAvailabilityRegistration.REGISTERED;
+				return BufferAvailabilityRegistration.SUCCEEDED_REGISTERED;
 			}
 			else if (this.random.nextBoolean()) {
 				this.bufferRequestAnswer.forceBufferAvailable();
-				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
+				return BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE;
 			}
 			else {
 				this.numSkipped++;
-				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+				return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
 			}
 		}
 
@@ -757,7 +757,7 @@ public class InboundEnvelopeDecoderTest {
 
 			for (Envelope env : envelopes) {
 				if (env.getBuffer() != null) {
-					// skip envelope if returned NOT_REGISTERED_BUFFER_POOL_DESTROYED
+					// skip envelope if returned FAILED_BUFFER_POOL_DESTROYED
 					if (!this.random.nextBoolean() && !this.random.nextBoolean() && !this.random.nextBoolean())
{
 						continue;
 					}


Mime
View raw message