flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/4] flink git commit: [Distributed runtime] Allow recursive union of buffer readers
Date Mon, 19 Jan 2015 14:13:11 GMT
[Distributed runtime] Allow recursive union of buffer readers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1cc30da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1cc30da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1cc30da

Branch: refs/heads/master
Commit: d1cc30da3e2a796590139da942015620c6035ddd
Parents: fb9f562
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Jan 16 17:32:04 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 19 14:44:57 2015 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BufferReader.java     |  8 +-
 .../io/network/api/reader/BufferReaderBase.java |  6 ++
 .../network/api/reader/UnionBufferReader.java   | 86 ++++++++++++++------
 .../api/reader/UnionBufferReaderTest.java       | 27 ++++++
 4 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index 1df7216..cb1cf5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -89,7 +89,7 @@ public final class BufferReader implements BufferReaderBase {
 
 	private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
 
-	private final AtomicReference<EventListener<BufferReader>> readerListener =
new AtomicReference<EventListener<BufferReader>>(null);
+	private final AtomicReference<EventListener<BufferReaderBase>> readerListener
= new AtomicReference<EventListener<BufferReaderBase>>(null);
 
 	// ------------------------------------------------------------------------
 
@@ -211,7 +211,8 @@ public final class BufferReader implements BufferReaderBase {
 	// Consume
 	// ------------------------------------------------------------------------
 
-	void requestPartitionsOnce() throws IOException {
+	@Override
+	public void requestPartitionsOnce() throws IOException {
 		if (!hasRequestedPartitions) {
 			// Sanity check
 			if (totalNumberOfInputChannels != inputChannels.size()) {
@@ -367,7 +368,8 @@ public final class BufferReader implements BufferReaderBase {
 		}
 	}
 
-	void subscribeToReader(EventListener<BufferReader> listener) {
+	@Override
+	public void subscribeToReader(EventListener<BufferReaderBase> listener) {
 		if (!this.readerListener.compareAndSet(null, listener)) {
 			throw new IllegalStateException(listener + " is already registered as a record availability
listener");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
index 04fae71..863ef77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -83,4 +84,9 @@ public interface BufferReaderBase extends ReaderBase {
 	int getNumberOfInputChannels();
 
 	boolean isTaskEvent();
+
+	void subscribeToReader(EventListener<BufferReaderBase> listener);
+
+	void requestPartitionsOnce() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
index e1c03cc..75348e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
@@ -40,19 +40,19 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class UnionBufferReader implements BufferReaderBase {
 
-	private final BufferReader[] readers;
+	private final BufferReaderBase[] readers;
 
-	private final DataAvailabilityListener readerListener = new DataAvailabilityListener();
+	private final DataAvailabilityListener dataAvailabilityListener;
 
 	// Set of readers, which are not closed yet
-	private final Set<BufferReader> remainingReaders;
+	private final Set<BufferReaderBase> remainingReaders;
 
 	// Logical channel index offset for each reader
-	private final Map<BufferReader, Integer> readerToIndexOffsetMap = new HashMap<BufferReader,
Integer>();
+	private final Map<BufferReaderBase, Integer> readerToIndexOffsetMap = new HashMap<BufferReaderBase,
Integer>();
 
 	private int totalNumInputChannels;
 
-	private BufferReader currentReader;
+	private BufferReaderBase currentReader;
 
 	private int currentReaderChannelIndexOffset;
 
@@ -64,19 +64,21 @@ public class UnionBufferReader implements BufferReaderBase {
 
 	private boolean isTaskEvent;
 
-	public UnionBufferReader(BufferReader... readers) {
+	public UnionBufferReader(BufferReaderBase... readers) {
 		checkNotNull(readers);
 		checkArgument(readers.length >= 2, "Union buffer reader must be initialized with at
least two individual buffer readers");
 
 		this.readers = readers;
-		this.remainingReaders = new HashSet<BufferReader>(readers.length + 1, 1.0F);
+		this.remainingReaders = new HashSet<BufferReaderBase>(readers.length + 1, 1.0F);
+
+		this.dataAvailabilityListener = new DataAvailabilityListener(this);
 
 		int currentChannelIndexOffset = 0;
 
 		for (int i = 0; i < readers.length; i++) {
-			BufferReader reader = readers[i];
+			BufferReaderBase reader = readers[i];
 
-			reader.subscribeToReader(readerListener);
+			reader.subscribeToReader(dataAvailabilityListener);
 
 			remainingReaders.add(reader);
 			readerToIndexOffsetMap.put(reader, currentChannelIndexOffset);
@@ -87,20 +89,26 @@ public class UnionBufferReader implements BufferReaderBase {
 	}
 
 	@Override
-	public Buffer getNextBuffer() throws IOException, InterruptedException {
+	public void requestPartitionsOnce() throws IOException {
 		if (!hasRequestedPartitions) {
-			for (BufferReader reader : readers) {
+			for (BufferReaderBase reader : readers) {
 				reader.requestPartitionsOnce();
 			}
 
 			hasRequestedPartitions = true;
 		}
+	}
+
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+		requestPartitionsOnce();
 
 		do {
 			if (currentReader == null) {
 				// Finished when all readers are finished
 				if (isFinished()) {
-					readerListener.clear();
+					dataAvailabilityListener.clear();
 					return null;
 				}
 				// Finished with superstep when all readers finished superstep
@@ -110,7 +118,7 @@ public class UnionBufferReader implements BufferReaderBase {
 				}
 				else {
 					while (true) {
-						currentReader = readerListener.getNextReaderBlocking();
+						currentReader = dataAvailabilityListener.getNextReaderBlocking();
 						currentReaderChannelIndexOffset = readerToIndexOffsetMap.get(currentReader);
 
 						if (isIterative && !remainingReaders.contains(currentReader)) {
@@ -118,7 +126,7 @@ public class UnionBufferReader implements BufferReaderBase {
 							// of superstep event and notified the union reader
 							// about newer data *before* all other readers have
 							// done so, we delay this notifications.
-							readerListener.addReader(currentReader);
+							dataAvailabilityListener.addReader(currentReader);
 						}
 						else {
 							break;
@@ -169,8 +177,13 @@ public class UnionBufferReader implements BufferReaderBase {
 	}
 
 	@Override
+	public void subscribeToReader(EventListener<BufferReaderBase> listener) {
+		dataAvailabilityListener.registerListener(listener);
+	}
+
+	@Override
 	public boolean isFinished() {
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			if (!reader.isFinished()) {
 				return false;
 			}
@@ -182,7 +195,7 @@ public class UnionBufferReader implements BufferReaderBase {
 	private void resetRemainingReaders() {
 		checkState(isIterative, "Tried to reset remaining reader with non-iterative reader.");
 		checkState(remainingReaders.isEmpty(), "Tried to reset remaining readers, but there are
some remaining readers.");
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			remainingReaders.add(reader);
 		}
 	}
@@ -193,14 +206,14 @@ public class UnionBufferReader implements BufferReaderBase {
 
 	@Override
 	public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<?
extends TaskEvent> eventType) {
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			reader.subscribeToTaskEvent(eventListener, eventType);
 		}
 	}
 
 	@Override
 	public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException {
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			reader.sendTaskEvent(event);
 		}
 	}
@@ -213,21 +226,21 @@ public class UnionBufferReader implements BufferReaderBase {
 	public void setIterativeReader() {
 		isIterative = true;
 
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			reader.setIterativeReader();
 		}
 	}
 
 	@Override
 	public void startNextSuperstep() {
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			reader.startNextSuperstep();
 		}
 	}
 
 	@Override
 	public boolean hasReachedEndOfSuperstep() {
-		for (BufferReader reader : readers) {
+		for (BufferReaderBase reader : readers) {
 			if (!reader.hasReachedEndOfSuperstep()) {
 				return false;
 			}
@@ -240,25 +253,46 @@ public class UnionBufferReader implements BufferReaderBase {
 	// Data availability notifications
 	// ------------------------------------------------------------------------
 
-	private static class DataAvailabilityListener implements EventListener<BufferReader>
{
+	private static class DataAvailabilityListener implements EventListener<BufferReaderBase>
{
+
+		private final UnionBufferReader unionReader;
 
-		private final BlockingQueue<BufferReader> readersWithData = new LinkedBlockingQueue<BufferReader>();
+		private final BlockingQueue<BufferReaderBase> readersWithData = new LinkedBlockingQueue<BufferReaderBase>();
+
+		private volatile EventListener<BufferReaderBase> registeredListener;
+
+		private DataAvailabilityListener(UnionBufferReader unionReader) {
+			this.unionReader = unionReader;
+		}
 
 		@Override
-		public void onEvent(BufferReader reader) {
+		public void onEvent(BufferReaderBase reader) {
 			readersWithData.add(reader);
+
+			if (registeredListener != null) {
+				registeredListener.onEvent(unionReader);
+			}
 		}
 
-		BufferReader getNextReaderBlocking() throws InterruptedException {
+		BufferReaderBase getNextReaderBlocking() throws InterruptedException {
 			return readersWithData.take();
 		}
 
-		void addReader(BufferReader reader) {
+		void addReader(BufferReaderBase reader) {
 			readersWithData.add(reader);
 		}
 
 		void clear() {
 			readersWithData.clear();
 		}
+
+		void registerListener(EventListener<BufferReaderBase> listener) {
+			if (registeredListener == null) {
+				registeredListener = listener;
+			}
+			else {
+				throw new IllegalStateException("Already registered listener.");
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
index 8871d4e..a105557 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
@@ -109,4 +109,31 @@ public class UnionBufferReaderTest {
 
 		verifyListenerCalled(listener, 8);
 	}
+
+	@Test
+	public void testGetNextBufferUnionOfUnionReader() throws Exception {
+		final MockBufferReader reader1 = new MockBufferReader();
+		final MockBufferReader reader2 = new MockBufferReader();
+
+		final UnionBufferReader unionReader = new UnionBufferReader(reader1.getMock(), reader2.getMock());
+
+		final MockBufferReader reader3 = new MockBufferReader();
+
+		final UnionBufferReader unionUnionReader = new UnionBufferReader(unionReader, reader3.getMock());
+
+		reader1.readBuffer().readBuffer().readBuffer().readEvent().readEvent().readBuffer().finish();
+
+		reader2.readEvent().readBuffer().readBuffer().readEvent().readBuffer().finish();
+
+		reader3.readBuffer().readBuffer().readEvent().readEvent().finish();
+
+		// Task event listener to be notified...
+		final EventListener<TaskEvent> listener = mock(EventListener.class);
+		unionUnionReader.subscribeToTaskEvent(listener, TestTaskEvent.class);
+
+		// Consume the reader
+		consumeAndVerify(unionUnionReader, 9);
+
+		verifyListenerCalled(listener, 6);
+	}
 }


Mime
View raw message