flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer
Date Mon, 29 Feb 2016 15:24:31 GMT
Repository: flink
Updated Branches:
  refs/heads/master f881e7079 -> 405d22236


[hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/405d2223
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/405d2223
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/405d2223

Branch: refs/heads/master
Commit: 405d2223697344e41aa11cc66cadf6b9afcacd89
Parents: f881e70
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 26 21:23:56 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 29 13:38:37 2016 +0100

----------------------------------------------------------------------
 .../runtime/operators/windowing/buffers/FoldingWindowBuffer.java   | 2 +-
 .../runtime/operators/windowing/buffers/ReducingWindowBuffer.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
index fa44f9d..f6c2319 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
@@ -63,7 +63,7 @@ public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T,
ACC> {
 
 	@Override
 	public void storeElement(StreamRecord<T> element) throws Exception {
-		data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp());
+		data.replace(foldFunction.fold(data.getValue(), element.getValue()));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
index 1f2b639..d3bf4b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
@@ -57,7 +57,7 @@ public class ReducingWindowBuffer<T> implements WindowBuffer<T,
T> {
 	@Override
 	public void storeElement(StreamRecord<T> element) throws Exception {
 		if (data == null) {
-			data = new StreamRecord<>(element.getValue(), element.getTimestamp());
+			data = element.copy(element.getValue());
 		} else {
 			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
 		}


Mime
View raw message