flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [runtime] Fix possible NPE during cancelling
Date Tue, 26 May 2015 13:41:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3ef4e68bf -> 924830ffa


[runtime] Fix possible NPE during cancelling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/924830ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/924830ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/924830ff

Branch: refs/heads/master
Commit: 924830ffa01d1fa1ba73acde32aa65d4a8e4dbac
Parents: 3ef4e68
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue May 26 15:38:21 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue May 26 15:40:50 2015 +0200

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java    | 26 ++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/924830ff/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index f1dba42..5964b49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
@@ -377,15 +376,28 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 			boolean success = false;
 
 			try {
-				checkNotNull(buffer, "Buffer request could not be satisfied.");
+				if (buffer != null) {
+					if (availableBuffer.compareAndSet(null, buffer)) {
+						ctx.channel().eventLoop().execute(this);
 
-				if (availableBuffer.compareAndSet(null, buffer)) {
-					ctx.channel().eventLoop().execute(this);
-
-					success = true;
+						success = true;
+					}
+					else {
+						throw new IllegalStateException("Received a buffer notification, " +
+								" but the previous one has not been handled yet.");
+					}
 				}
 				else {
-					throw new IllegalStateException("Received a buffer notification, but the previous one
has not been handled yet.");
+					// The buffer pool has been destroyed
+					stagedBufferResponse = null;
+
+					if (stagedMessages.isEmpty()) {
+						ctx.channel().config().setAutoRead(true);
+						ctx.channel().read();
+					}
+					else {
+						ctx.channel().eventLoop().execute(stagedMessagesHandler);
+					}
 				}
 			}
 			catch (Throwable t) {


Mime
View raw message