flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [01/27] incubator-flink git commit: [streaming] Time trigger preNotify fix
Date Sun, 04 Jan 2015 20:50:51 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 561eaf047 -> f71b0c42f


[streaming] Time trigger preNotify fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/87d699d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/87d699d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/87d699d7

Branch: refs/heads/master
Commit: 87d699d7025f6ceadeb64fe972b7185079b6ec22
Parents: 561eaf0
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Jan 2 18:33:46 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:33:46 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/policy/TimeTriggerPolicy.java    | 6 +++---
 .../api/windowing/policy/TimeTriggerPolicyTest.java          | 8 +++-----
 2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/87d699d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 3539ad6..57bccf2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -107,9 +107,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		LinkedList<Object> fakeElements = new LinkedList<Object>();
 		// check if there is more then one window border missed
 		// use > here. In case >= would fit, the regular call will do the job.
-		while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
+		while (timestamp.getTimestamp(datapoint) >= startTime + granularity) {
 			startTime += granularity;
-			fakeElements.add(startTime-1);
+			fakeElements.add(startTime - 1);
 		}
 		return (Object[]) fakeElements.toArray();
 	}
@@ -146,7 +146,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		// start time is excluded, but end time is included: >=
 		if (System.currentTimeMillis() >= startTime + granularity) {
 			startTime += granularity;
-			callback.sendFakeElement(startTime-1);
+			callback.sendFakeElement(startTime - 1);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/87d699d7/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 28d35ec..9c77a55 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -50,8 +50,7 @@ public class TimeTriggerPolicyTest {
 		// test different granularity
 		for (long granularity = 0; granularity < 31; granularity++) {
 			// create policy
-			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
-					timeStamp);
+			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
timeStamp);
 
 			// remember window border
 			// Remark: This might NOT work in case the timeStamp uses
@@ -101,11 +100,10 @@ public class TimeTriggerPolicyTest {
 		};
 
 		// create policy
-		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
-				timeStamp);
+		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp);
 
 		// expected result
-		Long[][] result = { {}, {}, { 4L, 9L, 14L }, { 24L } };
+		Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
 
 		// call policy
 		for (int i = 0; i < times.length; i++) {


Mime
View raw message