flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/2] flink git commit: [streaming] Timestamp comparison bugfix
Date Sun, 26 Apr 2015 12:58:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9689ca8f3 -> 8791d4f02


[streaming] Timestamp comparison bugfix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8791d4f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8791d4f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8791d4f0

Branch: refs/heads/master
Commit: 8791d4f020f812593ceca1a047e8b0722c19d879
Parents: b950d5b
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Apr 26 13:02:59 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sun Apr 26 14:57:06 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/WindowedDataStream.java  | 3 ++-
 .../streaming/api/windowing/helper/TimestampWrapper.java    | 9 +++++++--
 .../api/windowing/policy/TimeTriggerPolicyTest.java         | 3 +++
 .../flink/streaming/examples/windowing/StockPrices.java     | 5 +++--
 4 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8791d4f0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index da8611e..66dd4f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -450,7 +450,8 @@ public class WindowedDataStream<OUT> {
 		// discretized stream, we also pass the type of the windowbuffer
 		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
-		if (!(windowBuffer instanceof PreAggregator)) {
+		if (getEviction() instanceof KeepAllEvictionPolicy
+				&& !(windowBuffer instanceof PreAggregator)) {
 			throw new RuntimeException(
 					"Error in preaggregator logic, parallel time reduce should always be preaggregated");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8791d4f0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
index a14b1c1..c2ec7c2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
@@ -50,8 +50,13 @@ public class TimestampWrapper<T> implements Serializable {
 			try {
 				@SuppressWarnings("unchecked")
 				TimestampWrapper<T> otherTSW = (TimestampWrapper<T>) other;
-				return startTime == otherTSW.startTime
-						&& timestamp.getClass() == otherTSW.timestamp.getClass();
+				if (timestamp instanceof SystemTimestamp
+						&& otherTSW.timestamp instanceof SystemTimestamp) {
+					return true;
+				} else {
+					return startTime == otherTSW.startTime
+							&& timestamp.getClass() == otherTSW.timestamp.getClass();
+				}
 			} catch (ClassCastException e) {
 				return false;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/8791d4f0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 2fabbae..5b26854 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
@@ -111,6 +112,8 @@ public class TimeTriggerPolicyTest {
 
 		assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
 				0)), new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
3)));
+
+		assertEquals(SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8791d4f0/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index ab29d3c..d745fc5 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Random;
@@ -203,7 +204,7 @@ public class StockPrices {
 	// DATA TYPES
 	// *************************************************************************
 
-	public static class StockPrice {
+	public static class StockPrice implements Serializable {
 
 		private static final long serialVersionUID = 1L;
 		public String symbol;
@@ -226,7 +227,7 @@ public class StockPrices {
 		}
 	}
 
-	public static class Count {
+	public static class Count implements Serializable{
 		
 		private static final long serialVersionUID = 1L;
 		public String symbol;


Mime
View raw message