flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] flink git commit: [FLINK-1761] [runtime] Fix sequence number mismatch on empty buffer drop.
Date Sun, 22 Mar 2015 00:43:50 GMT
[FLINK-1761] [runtime] Fix sequence number mismatch on empty buffer drop.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/925481fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/925481fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/925481fb

Branch: refs/heads/master
Commit: 925481fb1c88f3c45b289cdf5ef203190492031a
Parents: 380ef87
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Mar 20 19:31:56 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Mar 21 22:04:30 2015 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/runtime/PojoSerializer.java   |  3 ++-
 .../runtime/io/network/api/writer/RecordWriter.java  | 15 ++++++++++-----
 .../network/netty/PartitionRequestClientHandler.java |  1 +
 .../partition/consumer/RemoteInputChannel.java       | 14 ++++++++++++++
 4 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/925481fb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 8b95296..b81ab67 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -490,7 +490,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T>
{
 					}
 				}
 			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check
the fields" + "before.");
+				throw new RuntimeException(
+						"Error during POJO copy, this should not happen since we check the fields before.");
 			}
 		} else {
 			if (subclassSerializer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/925481fb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7f84786..85bf841 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -100,17 +100,22 @@ public class RecordWriter<T extends IOReadableWritable> {
 			RecordSerializer<T> serializer = serializers[targetChannel];
 
 			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer == null) {
-					writer.writeEvent(event, targetChannel);
-				}
-				else {
+
+				if (serializer.hasData()) {
+					Buffer buffer = serializer.getCurrentBuffer();
+					if (buffer == null) {
+						throw new IllegalStateException("Serializer has data but no buffer.");
+					}
+
 					writer.writeBuffer(buffer, targetChannel);
 					writer.writeEvent(event, targetChannel);
 
 					buffer = writer.getBufferProvider().requestBufferBlocking();
 					serializer.setNextBuffer(buffer);
 				}
+				else {
+					writer.writeEvent(event, targetChannel);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/925481fb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 12ed140..382f385 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -194,6 +194,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter
{
 				// Early return for empty buffers. Otherwise Netty's readBytes() throws an
 				// IndexOutOfBoundsException.
 				if (bufferOrEvent.getSize() == 0) {
+					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber);
 					return true;
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/925481fb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 2ca2ff7..cae7837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -211,6 +211,20 @@ public class RemoteInputChannel extends InputChannel {
 		}
 	}
 
+	public void onEmptyBuffer(int sequenceNumber) {
+		if (!isReleased.get()) {
+			synchronized (receivedBuffers) {
+				if (expectedSequenceNumber == sequenceNumber) {
+					expectedSequenceNumber++;
+				}
+				else {
+					IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber);
+					ioError.compareAndSet(null, error);
+				}
+			}
+		}
+	}
+
 	public void onError(Throwable error) {
 		if (ioError.compareAndSet(null, error instanceof IOException ? (IOException) error : new
IOException(error))) {
 			notifyAvailableBuffer();


Mime
View raw message