flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/3] flink git commit: [FLINK-7699][core] Define the BufferListener interface to replace EventlListener in BufferProvider
Date Tue, 10 Oct 2017 14:53:51 GMT
[FLINK-7699][core] Define the BufferListener interface to replace EventlListener in BufferProvider

This closes #4735.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8706c6f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8706c6f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8706c6f4

Branch: refs/heads/master
Commit: 8706c6f445fc92159d15f610500d4ae5d2c49757
Parents: d3cbba5
Author: Zhijiang <wangzhijiang999@aliyun.com>
Authored: Wed Sep 27 18:24:15 2017 +0800
Committer: zentol <chesnay@apache.org>
Committed: Tue Oct 10 16:53:20 2017 +0200

----------------------------------------------------------------------
 .../io/network/buffer/BufferListener.java       | 39 +++++++++++++
 .../io/network/buffer/BufferProvider.java       | 21 +++----
 .../io/network/buffer/LocalBufferPool.java      | 17 +++---
 .../netty/PartitionRequestClientHandler.java    | 61 ++++++++++----------
 .../partition/consumer/RemoteInputChannel.java  | 53 ++++++++++++++++-
 .../io/network/buffer/LocalBufferPoolTest.java  | 61 ++++++++++++++------
 .../PartitionRequestClientHandlerTest.java      | 14 ++---
 .../util/TestInfiniteBufferProvider.java        |  4 +-
 .../network/util/TestPooledBufferProvider.java  | 12 ++--
 9 files changed, 196 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
new file mode 100644
index 0000000..05b4156
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+/**
+ * Interface of the availability of buffers. Listeners can opt for a one-time only
+ * notification or to be notified repeatedly.
+ */
+public interface BufferListener {
+
+	/**
+	 * Notification callback if a buffer is recycled and becomes available in buffer pool.
+	 *
+	 * @param buffer buffer that becomes available in buffer pool.
+	 * @return true if the listener wants to be notified next time.
+	 */
+	boolean notifyBufferAvailable(Buffer buffer);
+
+	/**
+	 * Notification callback if the buffer provider is destroyed.
+	 */
+	void notifyBufferDestroyed();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index c3373fa..9782584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -18,43 +18,38 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.runtime.util.event.EventListener;
-
 import java.io.IOException;
 
 /**
  * A buffer provider to request buffers from in a synchronous or asynchronous fashion.
  *
- * <p> The data producing side (result partition writers) request buffers in a synchronous
fashion,
+ * <p>The data producing side (result partition writers) request buffers in a synchronous
fashion,
  * whereas the input side requests asynchronously.
  */
 public interface BufferProvider {
 
 	/**
 	 * Returns a {@link Buffer} instance from the buffer provider, if one is available.
-	 * <p>
-	 * Returns <code>null</code> if no buffer is available or the buffer provider
has been destroyed.
+	 *
+	 * <p>Returns <code>null</code> if no buffer is available or the buffer
provider has been destroyed.
 	 */
 	Buffer requestBuffer() throws IOException;
 
 	/**
 	 * Returns a {@link Buffer} instance from the buffer provider.
-	 * <p>
-	 * If there is no buffer available, the call will block until one becomes available again
or the
+	 *
+	 * <p>If there is no buffer available, the call will block until one becomes available
again or the
 	 * buffer provider has been destroyed.
 	 */
 	Buffer requestBufferBlocking() throws IOException, InterruptedException;
 
 	/**
 	 * Adds a buffer availability listener to the buffer provider.
-	 * <p>
-	 * The operation fails with return value <code>false</code>, when there is a
buffer available or
+	 *
+	 * <p>The operation fails with return value <code>false</code>, when there
is a buffer available or
 	 * the buffer provider has been destroyed.
-	 * <p>
-	 * If the buffer provider gets destroyed while the listener is registered the listener will
be
-	 * notified with a <code>null</code> value.
 	 */
-	boolean addListener(EventListener<Buffer> listener);
+	boolean addBufferListener(BufferListener listener);
 
 	/**
 	 * Returns whether the buffer provider has been destroyed.

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index b485fd1..a66373c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,13 +19,11 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Queue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,7 +62,7 @@ class LocalBufferPool implements BufferPool {
 	 * Buffer availability listeners, which need to be notified when a Buffer becomes available.
 	 * Listeners can only be registered at a time/state where no Buffer instance was available.
 	 */
-	private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
+	private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();
 
 	/** Maximum number of network buffers to allocate. */
 	private final int maxNumberOfMemorySegments;
@@ -239,7 +237,7 @@ class LocalBufferPool implements BufferPool {
 				returnMemorySegment(segment);
 			}
 			else {
-				EventListener<Buffer> listener = registeredListeners.poll();
+				BufferListener listener = registeredListeners.poll();
 
 				if (listener == null) {
 					availableMemorySegments.add(segment);
@@ -247,7 +245,10 @@ class LocalBufferPool implements BufferPool {
 				}
 				else {
 					try {
-						listener.onEvent(new Buffer(segment, this));
+						boolean needMoreBuffers = listener.notifyBufferAvailable(new Buffer(segment, this));
+						if (needMoreBuffers) {
+							registeredListeners.add(listener);
+						}
 					}
 					catch (Throwable ignored) {
 						availableMemorySegments.add(segment);
@@ -270,9 +271,9 @@ class LocalBufferPool implements BufferPool {
 					returnMemorySegment(segment);
 				}
 
-				EventListener<Buffer> listener;
+				BufferListener listener;
 				while ((listener = registeredListeners.poll()) != null) {
-					listener.onEvent(null);
+					listener.notifyBufferDestroyed();
 				}
 
 				isDestroyed = true;
@@ -283,7 +284,7 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
-	public boolean addListener(EventListener<Buffer> listener) {
+	public boolean addBufferListener(BufferListener listener) {
 		synchronized (availableMemorySegments) {
 			if (!availableMemorySegments.isEmpty() || isDestroyed) {
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index e3097ba..566b215 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -347,15 +347,15 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 	/**
 	 * A buffer availability listener, which subscribes/unsubscribes the NIO
 	 * read event.
-	 * <p>
-	 * If no buffer is available, the channel read event will be unsubscribed
+	 *
+	 * <p>If no buffer is available, the channel read event will be unsubscribed
 	 * until one becomes available again.
-	 * <p>
-	 * After a buffer becomes available again, the buffer is handed over by
-	 * the thread calling {@link #onEvent(Buffer)} to the network I/O
+	 *
+	 * <p>After a buffer becomes available again, the buffer is handed over by
+	 * the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O
 	 * thread, which then continues the processing of the staged buffer.
 	 */
-	private class BufferListenerTask implements EventListener<Buffer>, Runnable {
+	private class BufferListenerTask implements BufferListener, Runnable {
 
 		private final AtomicReference<Buffer> availableBuffer = new AtomicReference<Buffer>();
 
@@ -365,7 +365,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 
 			stagedBufferResponse = bufferResponse;
 
-			if (bufferProvider.addListener(this)) {
+			if (bufferProvider.addBufferListener(this)) {
 				if (ctx.channel().config().isAutoRead()) {
 					ctx.channel().config().setAutoRead(false);
 				}
@@ -383,34 +383,33 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 			return stagedBufferResponse != null;
 		}
 
+		public void notifyBufferDestroyed() {
+			// The buffer pool has been destroyed
+			stagedBufferResponse = null;
+
+			if (stagedMessages.isEmpty()) {
+				ctx.channel().config().setAutoRead(true);
+				ctx.channel().read();
+			}
+			else {
+				ctx.channel().eventLoop().execute(stagedMessagesHandler);
+			}
+		}
+
 		// Called by the recycling thread (not network I/O thread)
 		@Override
-		public void onEvent(Buffer buffer) {
+		public boolean notifyBufferAvailable(Buffer buffer) {
 			boolean success = false;
 
 			try {
-				if (buffer != null) {
-					if (availableBuffer.compareAndSet(null, buffer)) {
-						ctx.channel().eventLoop().execute(this);
+				if (availableBuffer.compareAndSet(null, buffer)) {
+					ctx.channel().eventLoop().execute(this);
 
-						success = true;
-					}
-					else {
-						throw new IllegalStateException("Received a buffer notification, " +
-								" but the previous one has not been handled yet.");
-					}
+					success = true;
 				}
 				else {
-					// The buffer pool has been destroyed
-					stagedBufferResponse = null;
-
-					if (stagedMessages.isEmpty()) {
-						ctx.channel().config().setAutoRead(true);
-						ctx.channel().read();
-					}
-					else {
-						ctx.channel().eventLoop().execute(stagedMessagesHandler);
-					}
+					throw new IllegalStateException("Received a buffer notification, " +
+							" but the previous one has not been handled yet.");
 				}
 			}
 			catch (Throwable t) {
@@ -423,12 +422,14 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 					}
 				}
 			}
+
+			return false;
 		}
 
 		/**
 		 * Continues the decoding of a staged buffer after a buffer has become available again.
-		 * <p>
-		 * This task is executed by the network I/O thread.
+		 *
+		 * <p>This task is executed by the network I/O thread.
 		 */
 		@Override
 		public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ee6bfda..4e1eaef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
@@ -46,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * An input channel, which requests a remote partition queue.
  */
-public class RemoteInputChannel extends InputChannel implements BufferRecycler {
+public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener
{
 
 	/** ID to distinguish this channel from other channels sharing the same TCP connection.
*/
 	private final InputChannelID id = new InputChannelID();
@@ -87,6 +88,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler
{
 	/** The number of available buffers that have not been announced to the producer yet. */
 	private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
+	/** The number of unsent buffers in the producer's sub partition. */
+	private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+	/** The tag indicates whether this channel is waiting for additional floating buffers from
the buffer pool. */
+	private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false);
+
 	public RemoteInputChannel(
 		SingleInputGate inputGate,
 		int channelIndex,
@@ -313,6 +320,50 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler
{
 		}
 	}
 
+	/**
+	 * The Buffer pool notifies this channel of an available floating buffer. If the channel
is released or
+	 * currently does not need extra buffers, the buffer should be recycled to the buffer pool.
Otherwise,
+	 * the buffer will be added into the <tt>availableBuffers</tt> queue and the
unannounced credit is
+	 * increased by one.
+	 *
+	 * @param buffer Buffer that becomes available in buffer pool.
+	 * @return True when this channel is waiting for more floating buffers, otherwise false.
+	 */
+	@Override
+	public boolean notifyBufferAvailable(Buffer buffer) {
+		checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating
buffers.");
+
+		synchronized (availableBuffers) {
+			// Important: the isReleased check should be inside the synchronized block.
+			if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) {
+				isWaitingForFloatingBuffers.set(false);
+				buffer.recycle();
+
+				return false;
+			}
+
+			availableBuffers.add(buffer);
+
+			if (unannouncedCredit.getAndAdd(1) == 0) {
+				notifyCreditAvailable();
+			}
+
+			if (availableBuffers.size() >= senderBacklog.get()) {
+				isWaitingForFloatingBuffers.set(false);
+				return false;
+			} else {
+				return true;
+			}
+		}
+	}
+
+	@Override
+	public void notifyBufferDestroyed() {
+		if (!isWaitingForFloatingBuffers.compareAndSet(true, false)) {
+			throw new IllegalStateException("This channel should be waiting for floating buffers currently.");
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Network I/O notifications (called by network I/O thread)
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 03f82d8..7a309d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -27,7 +26,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -46,6 +44,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -210,32 +209,39 @@ public class LocalBufferPoolTest {
 	// ------------------------------------------------------------------------
 
 	@Test
-	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
-		EventListener<Buffer> listener = spy(new EventListener<Buffer>() {
-			@Override
-			public void onEvent(Buffer buffer) {
-				buffer.recycle();
-			}
-		});
+	public void testPendingRequestWithListenersAfterRecycle() throws Exception {
+		BufferListener twoTimesListener = createBufferListener(2);
+		BufferListener oneTimeListener = createBufferListener(1);
 
-		localBufferPool.setNumBuffers(1);
+		localBufferPool.setNumBuffers(2);
 
-		Buffer available = localBufferPool.requestBuffer();
-		Buffer unavailable = localBufferPool.requestBuffer();
+		Buffer available1 = localBufferPool.requestBuffer();
+		Buffer available2 = localBufferPool.requestBuffer();
 
-		assertNull(unavailable);
+		assertNull(localBufferPool.requestBuffer());
 
-		assertTrue(localBufferPool.addListener(listener));
+		assertTrue(localBufferPool.addBufferListener(twoTimesListener));
+		assertTrue(localBufferPool.addBufferListener(oneTimeListener));
 
-		available.recycle();
+		// Recycle the first buffer to notify both of the above listeners once
+		// and the twoTimesListener will be added into the registeredListeners
+		// queue of buffer pool again
+		available1.recycle();
+		
+		verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
+		verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 
-		verify(listener, times(1)).onEvent(Matchers.any(Buffer.class));
+		// Recycle the second buffer to only notify the twoTimesListener
+		available2.recycle();
+
+		verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
+		verify(twoTimesListener, times(2)).notifyBufferAvailable(any(Buffer.class));
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testCancelPendingRequestsAfterDestroy() throws IOException {
-		EventListener<Buffer> listener = Mockito.mock(EventListener.class);
+		BufferListener listener = Mockito.mock(BufferListener.class);
 
 		localBufferPool.setNumBuffers(1);
 
@@ -244,13 +250,13 @@ public class LocalBufferPoolTest {
 
 		assertNull(unavailable);
 
-		localBufferPool.addListener(listener);
+		localBufferPool.addBufferListener(listener);
 
 		localBufferPool.lazyDestroy();
 
 		available.recycle();
 
-		verify(listener, times(1)).onEvent(null);
+		verify(listener, times(1)).notifyBufferDestroyed();
 	}
 
 	// ------------------------------------------------------------------------
@@ -396,6 +402,23 @@ public class LocalBufferPoolTest {
 		return networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments();
 	}
 
+	private BufferListener createBufferListener(int notificationTimes) {
+		return spy(new BufferListener() {
+			int times = 0;
+
+			@Override
+			public boolean notifyBufferAvailable(Buffer buffer) {
+				times++;
+				buffer.recycle();
+				return times < notificationTimes;
+			}
+
+			@Override
+			public void notifyBufferDestroyed() {
+			}
+		});
+	}
+
 	private static class BufferRequesterTask implements Callable<Boolean> {
 
 		private final BufferProvider bufferProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index f3f6feb..e1e5bd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
@@ -30,7 +31,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
@@ -74,7 +74,7 @@ public class PartitionRequestClientHandlerTest {
 		final BufferProvider bufferProvider = mock(BufferProvider.class);
 		when(bufferProvider.requestBuffer()).thenReturn(null);
 		when(bufferProvider.isDestroyed()).thenReturn(true);
-		when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
+		when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false);
 
 		final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
 		when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
@@ -179,14 +179,14 @@ public class PartitionRequestClientHandlerTest {
 		PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
 		EmbeddedChannel channel = new EmbeddedChannel(handler);
 
-		final AtomicReference<EventListener<Buffer>> listener = new AtomicReference<>();
+		final AtomicReference<BufferListener> listener = new AtomicReference<>();
 
 		BufferProvider bufferProvider = mock(BufferProvider.class);
-		when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new Answer<Boolean>()
{
+		when(bufferProvider.addBufferListener(any(BufferListener.class))).thenAnswer(new Answer<Boolean>()
{
 			@Override
 			@SuppressWarnings("unchecked")
 			public Boolean answer(InvocationOnMock invocation) throws Throwable {
-				listener.set((EventListener<Buffer>) invocation.getArguments()[0]);
+				listener.set((BufferListener) invocation.getArguments()[0]);
 				return true;
 			}
 		});
@@ -221,11 +221,11 @@ public class PartitionRequestClientHandlerTest {
 
 		// Notify about buffer => handle 1st msg
 		Buffer availableBuffer = createBuffer(false);
-		listener.get().onEvent(availableBuffer);
+		listener.get().notifyBufferAvailable(availableBuffer);
 
 		// Start processing of staged buffers (in run pending tasks). Make
 		// sure that the buffer provider acts like it's destroyed.
-		when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
+		when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false);
 		when(bufferProvider.isDestroyed()).thenReturn(true);
 
 		// Execute all tasks that are scheduled in the event loop. Further

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
index 976e63d..ad40a54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -51,7 +51,7 @@ public class TestInfiniteBufferProvider implements BufferProvider {
 	}
 
 	@Override
-	public boolean addListener(EventListener<Buffer> listener) {
+	public boolean addBufferListener(BufferListener listener) {
 		return false;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index d7e9643..c354eeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
 
@@ -91,7 +91,7 @@ public class TestPooledBufferProvider implements BufferProvider {
 	}
 
 	@Override
-	public boolean addListener(EventListener<Buffer> listener) {
+	public boolean addBufferListener(BufferListener listener) {
 		return bufferRecycler.registerListener(listener);
 	}
 
@@ -115,7 +115,7 @@ public class TestPooledBufferProvider implements BufferProvider {
 
 		private final Queue<Buffer> buffers;
 
-		private final ConcurrentLinkedQueue<EventListener<Buffer>> registeredListeners
=
+		private final ConcurrentLinkedQueue<BufferListener> registeredListeners =
 				Queues.newConcurrentLinkedQueue();
 
 		public PooledBufferProviderRecycler(Queue<Buffer> buffers) {
@@ -127,18 +127,18 @@ public class TestPooledBufferProvider implements BufferProvider {
 			synchronized (listenerRegistrationLock) {
 				final Buffer buffer = new Buffer(segment, this);
 
-				EventListener<Buffer> listener = registeredListeners.poll();
+				BufferListener listener = registeredListeners.poll();
 
 				if (listener == null) {
 					buffers.add(buffer);
 				}
 				else {
-					listener.onEvent(buffer);
+					listener.notifyBufferAvailable(buffer);
 				}
 			}
 		}
 
-		boolean registerListener(EventListener<Buffer> listener) {
+		boolean registerListener(BufferListener listener) {
 			synchronized (listenerRegistrationLock) {
 				if (buffers.isEmpty()) {
 					registeredListeners.add(listener);


Mime
View raw message