flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.
Date Tue, 27 Sep 2016 17:47:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master f1b5b35f5 -> 90902914a


[FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.

The deadlock could occur in cases where the SpilledSubpartitionViewAsyncIO would simultaneously
try to
release a buffer and encounter an error in another thread.

The field of congestion was the listener, which is now replaced by an AtomicReference, removing
the
necessity to lock in the case of reporting the error.

This closes #2444


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90902914
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90902914
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90902914

Branch: refs/heads/master
Commit: 90902914ac4b11f9554b67ad49e0d697a0d02f93
Parents: b928935
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Aug 31 16:22:34 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Sep 27 19:46:40 2016 +0200

----------------------------------------------------------------------
 .../SpilledSubpartitionViewAsyncIO.java         | 26 ++++++++------------
 .../checkpoint/CheckpointIDCounterTest.java     |  4 +--
 2 files changed, 11 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90902914/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index daccd28..ca25536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,7 +71,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 	private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>();
 
 	/** A data availability listener. */
-	private NotificationListener registeredListener;
+	private final AtomicReference<NotificationListener> registeredListener;
 
 	/** Error, which has occurred in the I/O thread. */
 	private volatile IOException errorInIOThread;
@@ -108,7 +109,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 		this.parent = checkNotNull(parent);
 		this.bufferProvider = checkNotNull(bufferProvider);
 		this.bufferAvailabilityListener = new BufferProviderCallback(this);
-
+		this.registeredListener = new AtomicReference<>();
+		
 		this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
 
 		if (initialSeekPosition > 0) {
@@ -154,14 +156,12 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 				return false;
 			}
 
-			if (registeredListener == null) {
-				registeredListener = listener;
-
+			if (registeredListener.compareAndSet(null, listener)) {
 				return true;
+			} else {
+				throw new IllegalStateException("already registered listener");
 			}
 		}
-
-		throw new IllegalStateException("Already registered listener.");
 	}
 
 	@Override
@@ -279,8 +279,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 
 			returnedBuffers.add(buffer);
 
-			listener = registeredListener;
-			registeredListener = null;
+			// after this, the listener should be null
+			listener = registeredListener.getAndSet(null);
 
 			// If this was the last buffer before we reached EOF, set the corresponding flag to
 			// ensure that further buffers are correctly recycled and eventually no further reads
@@ -303,13 +303,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView
{
 			errorInIOThread = error;
 		}
 
-		final NotificationListener listener;
-
-		synchronized (lock) {
-			listener = registeredListener;
-			registeredListener = null;
-		}
-
+		final NotificationListener listener = registeredListener.getAndSet(null);
 		if (listener != null) {
 			listener.onNotification();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/90902914/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
index dc43b47..49b5fe7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -57,9 +57,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 
 		@AfterClass
 		public static void tearDown() throws Exception {
-			if (ZooKeeper != null) {
-				ZooKeeper.shutdown();
-			}
+			ZooKeeper.shutdown();
 		}
 
 		@Before


Mime
View raw message