flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/5] incubator-flink git commit: [streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations.
Date Thu, 18 Dec 2014 22:22:48 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 88e64fc78 -> 227e40fe1


[streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window
situations.

[streaming] Changed TimeEvictionPolicy to keep timestamps in the buffer instead of data-items


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/227e40fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/227e40fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/227e40fe

Branch: refs/heads/master
Commit: 227e40fe11d5794a41433fb48efa887ab8bb91d2
Parents: 6884a0f
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Thu Dec 18 16:11:16 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/WindowedDataStream.java   | 13 ++++++++++++-
 .../api/windowing/policy/TimeEvictionPolicy.java       | 11 +++++++----
 2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 09b2678..788f28d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
@@ -490,7 +491,17 @@ public class WindowedDataStream<OUT> {
 			}
 		} else {
 			if (userEvicters == null) {
-				evicters.add(new TumblingEvictionPolicy<OUT>());
+				boolean notOnlyTime=false;
+				for (WindowingHelper<OUT> helper : triggerHelpers){
+					if (helper instanceof Time<?>){
+						evicters.add(helper.toEvict());
+					} else {
+						notOnlyTime=true;
+					}
+				}
+				if (notOnlyTime){
+					evicters.add(new TumblingEvictionPolicy<OUT>());
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 99116d0..aca1dee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -41,7 +41,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 	private long granularity;
 	private TimeStamp<DATA> timestamp;
-	private LinkedList<DATA> buffer = new LinkedList<DATA>();
+	private LinkedList<Long> buffer = new LinkedList<Long>();
 
 	/**
 	 * This eviction policy evicts all elements which are older than a specified
@@ -91,12 +91,15 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 		checkForDeleted(bufferSize);
 
+		//remember timestamp
+		long time=timestamp.getTimestamp(datapoint);
+		
 		// delete and count expired tuples
-		long threshold = timestamp.getTimestamp(datapoint) - granularity;
+		long threshold = time - granularity;
 		int counter = deleteAndCountExpired(threshold);
 
 		// Add current element to buffer
-		buffer.add(datapoint);
+		buffer.add(time);
 
 		// return result
 		return counter;
@@ -114,7 +117,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		int counter = 0;
 		while (!buffer.isEmpty()) {
 
-			if (timestamp.getTimestamp(buffer.getFirst()) <= threshold) {
+			if (buffer.getFirst() <= threshold) {
 				buffer.removeFirst();
 				counter++;
 			} else {


Mime
View raw message