flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [10/35] flink git commit: [hotfix][runtime] Deduplicate code in PipelinedSubpartition
Date Mon, 19 Feb 2018 14:08:03 GMT
[hotfix][runtime] Deduplicate code in PipelinedSubpartition


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2214a242
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2214a242
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2214a242

Branch: refs/heads/master
Commit: 2214a242f218dfc571f98b64fcde61f1a9f6013a
Parents: 6c3c334
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Thu Jan 18 10:41:18 2018 +0100
Committer: Piotr Nowojski <piotr.nowojski@gmail.com>
Committed: Mon Feb 19 12:21:19 2018 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        | 41 +++++++-------------
 1 file changed, 14 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2214a242/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 0637cc7..9c6197c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -67,6 +67,16 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public boolean add(Buffer buffer) throws IOException {
+		return add(buffer, false);
+	}
+
+	@Override
+	public void finish() throws IOException {
+		add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), true);
+		LOG.debug("Finished {}.", this);
+	}
+
+	private boolean add(Buffer buffer, boolean finish) throws IOException {
 		checkNotNull(buffer);
 
 		// view reference accessible outside the lock, but assigned inside the locked scope
@@ -83,41 +93,18 @@ class PipelinedSubpartition extends ResultSubpartition {
 			reader = readView;
 			updateStatistics(buffer);
 			increaseBuffersInBacklog(buffer);
-		}
-
-		// Notify the listener outside of the synchronized block
-		if (reader != null) {
-			reader.notifyBuffersAvailable(1);
-		}
-
-		return true;
-	}
-
-	@Override
-	public void finish() throws IOException {
-		final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
-		// view reference accessible outside the lock, but assigned inside the locked scope
-		final PipelinedSubpartitionView reader;
 
-		synchronized (buffers) {
-			if (isFinished || isReleased) {
-				return;
+			if (finish) {
+				isFinished = true;
 			}
-
-			buffers.add(buffer);
-			reader = readView;
-			updateStatistics(buffer);
-
-			isFinished = true;
 		}
 
-		LOG.debug("Finished {}.", this);
-
 		// Notify the listener outside of the synchronized block
 		if (reader != null) {
 			reader.notifyBuffersAvailable(1);
 		}
+
+		return true;
 	}
 
 	@Override


Mime
View raw message