Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BEA951088C for ; Thu, 18 Dec 2014 22:23:10 +0000 (UTC) Received: (qmail 30232 invoked by uid 500); 18 Dec 2014 22:23:10 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 30205 invoked by uid 500); 18 Dec 2014 22:23:10 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 30156 invoked by uid 99); 18 Dec 2014 22:23:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 22:23:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 18 Dec 2014 22:23:08 +0000 Received: (qmail 30003 invoked by uid 99); 18 Dec 2014 22:22:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 22:22:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 862C79CA8C7; Thu, 18 Dec 2014 22:22:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Thu, 18 Dec 2014 22:22:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-flink git commit: [streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations. X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master 88e64fc78 -> 227e40fe1 [streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations. [streaming] Changed TimeEvictionPolicy to keep timestamps in the buffer instead of data-items Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/227e40fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/227e40fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/227e40fe Branch: refs/heads/master Commit: 227e40fe11d5794a41433fb48efa887ab8bb91d2 Parents: 6884a0f Author: Jonas Traub (powibol) Authored: Thu Dec 18 16:11:16 2014 +0100 Committer: mbalassi Committed: Thu Dec 18 20:07:27 2014 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/WindowedDataStream.java | 13 ++++++++++++- .../api/windowing/policy/TimeEvictionPolicy.java | 11 +++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 09b2678..788f28d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable; import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable; +import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; @@ -490,7 +491,17 @@ public class WindowedDataStream { } } else { if (userEvicters == null) { - evicters.add(new TumblingEvictionPolicy()); + boolean notOnlyTime=false; + for (WindowingHelper helper : triggerHelpers){ + if (helper instanceof Time){ + evicters.add(helper.toEvict()); + } else { + notOnlyTime=true; + } + } + if (notOnlyTime){ + evicters.add(new TumblingEvictionPolicy()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java index 99116d0..aca1dee 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java @@ -41,7 +41,7 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, private long granularity; private TimeStamp timestamp; - private LinkedList buffer = new LinkedList(); + private LinkedList buffer = new LinkedList(); /** * This eviction policy evicts all elements which are older than a specified @@ -91,12 +91,15 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, checkForDeleted(bufferSize); + //remember timestamp + long time=timestamp.getTimestamp(datapoint); + // delete and count expired tuples - long threshold = timestamp.getTimestamp(datapoint) - granularity; + long threshold = time - granularity; int counter = deleteAndCountExpired(threshold); // Add current element to buffer - buffer.add(datapoint); + buffer.add(time); // return result return counter; @@ -114,7 +117,7 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, int counter = 0; while (!buffer.isEmpty()) { - if (timestamp.getTimestamp(buffer.getFirst()) <= threshold) { + if (buffer.getFirst() <= threshold) { buffer.removeFirst(); counter++; } else {