flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] incubator-flink git commit: [FLINK-1222] Tasks send close acknowledgements early.
Date Fri, 07 Nov 2014 10:12:41 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master a959dd503 -> e58049711


[FLINK-1222] Tasks send close acknowledgements early.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef406916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef406916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef406916

Branch: refs/heads/master
Commit: ef406916dbeabaef79b4ffd38fe5916cdd34bd2f
Parents: a959dd5
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Nov 6 15:14:12 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Nov 6 15:14:12 2014 +0100

----------------------------------------------------------------------
 .../io/network/channels/InputChannel.java       | 34 +++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef406916/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
index 1d14172..80181be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
@@ -81,7 +81,9 @@ public class InputChannel<T extends IOReadableWritable> extends Channel
implemen
 	 */
 	private long amountOfDataTransmitted;
 
-	private volatile boolean brokerAggreedToCloseChannel;
+	private volatile boolean weClosedChannel;
+	
+	private volatile boolean senderClosedChannel;
 
 	// -------------------------------------------------------------------------------------------
 
@@ -158,7 +160,12 @@ public class InputChannel<T extends IOReadableWritable> extends
Channel implemen
 
 				AbstractEvent evt = boe.getEvent();
 				if (evt.getClass() == ChannelCloseEvent.class) {
-					this.brokerAggreedToCloseChannel = true;
+					this.senderClosedChannel = true;
+					try {
+						close();
+					} catch (InterruptedException e) {
+						throw new IOException(e);
+					}
 					return InputChannelResult.END_OF_STREAM;
 				}
 				else if (evt.getClass() == EndOfSuperstepEvent.class) {
@@ -207,11 +214,16 @@ public class InputChannel<T extends IOReadableWritable> extends
Channel implemen
 		if (this.ioException != null) {
 			throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(),
this.ioException);
 		} else {
-			return this.brokerAggreedToCloseChannel;
+			return this.weClosedChannel && this.senderClosedChannel;
 		}
 	}
 
 	public void close() throws IOException, InterruptedException {
+		
+		if (weClosedChannel) {
+			return;
+		}
+		weClosedChannel = true;
 
 		this.deserializer.clear();
 		if (this.dataBuffer != null) {
@@ -220,13 +232,13 @@ public class InputChannel<T extends IOReadableWritable> extends
Channel implemen
 		}
 
 		// This code fragment makes sure the isClosed method works in case the channel input has
not been fully consumed
-		while (!this.brokerAggreedToCloseChannel)
+		while (!this.senderClosedChannel)
 		{
 			BufferOrEvent next = getNextBufferOrEvent();
 			if (next != null) {
 				if (next.isEvent()) {
 					if (next.getEvent() instanceof ChannelCloseEvent) {
-						this.brokerAggreedToCloseChannel = true;
+						this.senderClosedChannel = true;
 					}
 				} else {
 					releasedConsumedReadBuffer(next.getBuffer());
@@ -266,8 +278,14 @@ public class InputChannel<T extends IOReadableWritable> extends
Channel implemen
 
 	@Override
 	public void releaseAllResources() {
-		this.brokerAggreedToCloseChannel = true;
+		this.senderClosedChannel = true;
 		this.deserializer.clear();
+		
+		Buffer buf = this.dataBuffer;
+		if (buf != null) {
+			buf.recycleBuffer();
+			dataBuffer = null;
+		}
 
 		// The buffers are recycled by the input channel wrapper
 	}
@@ -310,8 +328,8 @@ public class InputChannel<T extends IOReadableWritable> extends
Channel implemen
 				// notify that something (an exception) is available
 				notifyGateThatInputIsAvailable();
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
 							+ " but received " + sequenceNumber);
 				}
 


Mime
View raw message