Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E5FC200B49 for ; Tue, 5 Jul 2016 04:35:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8CF63160A6D; Tue, 5 Jul 2016 02:35:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3DD24160A65 for ; Tue, 5 Jul 2016 04:35:27 +0200 (CEST) Received: (qmail 52244 invoked by uid 500); 5 Jul 2016 02:35:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 52234 invoked by uid 99); 5 Jul 2016 02:35:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 02:35:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BB77BC0D32 for ; Tue, 5 Jul 2016 02:35:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id wPEdJPhvi39o for ; Tue, 5 Jul 2016 02:35:24 +0000 (UTC) Received: from mail-vk0-f49.google.com (mail-vk0-f49.google.com [209.85.213.49]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id A60815F4E5 for ; Tue, 5 Jul 2016 02:35:23 +0000 (UTC) Received: by mail-vk0-f49.google.com with SMTP id i63so78770530vkb.2 for ; Mon, 04 Jul 2016 19:35:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=lK7N0Q8Fw7kf1Xx6fFLw6FZKqcye3OFLkgySHwxiBKs=; b=PiUGUAyhoEOxvW6dbKIcnwlYh1B5aME8K5dmzxBCWkNlPl2Ula6txMfnY4tpSSGuSz uEGmqMWa9/Eh5jahclRD23JaYC4TKp6CRePxWG6S8GpJ7zpjAQGkvPzy1pT3ig6hKiYt qHB/jr2CZXoYv6HU69wfMUhfEN9xW7kvc/JTtnS2iSBM5RMBAOrHk3/pZ6uDL8Uay2UC TdguRr/d+CqG9nDOpoGKDl52QD74WHG3h6HzWwvRnMFc5zzgPeWCJOFfs5tD/GeqP8r3 tlt7YGRV9JSGnv0r7tlgioxGcMuVNWvAKBioI74bhbqhr25yFKqci1ZpFBIwnMZxc3zH OX1w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=lK7N0Q8Fw7kf1Xx6fFLw6FZKqcye3OFLkgySHwxiBKs=; b=TtwqaAUGxq+OVEnDY3USkPLR1c6CwkoSX01l/lzzzAUUhTCwu2scvZvX6bd9vdpaSa a2VztLBzditvHwzaRwcNDXvbzNZnEwrM62CGeVtpBfY6qAflZNpd0DMyfGPy5pWapHBV d3ggahGhEn4yEmXhSGzKwU2S3u4g/qfXZYjJ58RHcGs6+zmA7o9MY8Yc/9beZUkjTkJC WMZ5Mz+OeenMUePd153PKbp50czajIy6uDCC0Y7ClbiOfAnVAVkUPfN7X0P344wA/PcZ fhKn6xSY0oBqcsG3DKslXy+O+Vl6SCEE+jL0VpJF2gaQR/ggbe02h1VWtFrVF5/h0OSs cAgg== X-Gm-Message-State: ALyK8tK1RZqEhvVUN+rqy/KIawBbj9cOM3To9/kS6iSZLFDnz/+LUZ4tPYYGgyZ/hTQX5JnmeQnyoHnlMYI6+w== X-Received: by 10.159.33.201 with SMTP id 67mr2537196uac.90.1467686117048; Mon, 04 Jul 2016 19:35:17 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.75.211 with HTTP; Mon, 4 Jul 2016 19:35:16 -0700 (PDT) In-Reply-To: References: From: Yukun Guo Date: Tue, 5 Jul 2016 10:35:16 +0800 Message-ID: Subject: Re: Tumbling time window cannot group events properly To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113d0c9ca607310536da4d6b archived-at: Tue, 05 Jul 2016 02:35:28 -0000 --001a113d0c9ca607310536da4d6b Content-Type: text/plain; charset=UTF-8 The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window. The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one. On 4 July 2016 at 21:41, Aljoscha Krettek wrote: > Could you please elaborate a bit on what exactly the output means and how > you derive that events are leaking into the previous window? > > On Mon, 4 Jul 2016 at 13:20 Yukun Guo wrote: > >> Thanks for the information. Strange enough, after I set the time >> characteristic to EventTime, the events are leaking into the previous >> window: >> >> ... >> Mon, 04 Jul 2016 19:10:49 CST >> Mon, 04 Jul 2016 19:10:50 CST # ? >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> 100 events in this window >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:51 CST >> Mon, 04 Jul 2016 19:10:51 CST >> >> >> On 4 July 2016 at 16:15, Aljoscha Krettek wrote: >> >>> Hi, >>> I think it should be as simple as setting event time as the stream time >>> characteristic: >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> >>> The problem is that .timeWindow(Time.seconds(10)) will use processing >>> time if you don't specify a time characteristic. You can enforce using an >>> event-time window using this: >>> >>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) >>> >>> Cheers, >>> Aljoscha >>> >>> >>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo wrote: >>> >>>> Hi, >>>> >>>> I wrote a program which constructs a WindowedStream to compute periodic >>>> data statistics every 10 seconds. However, I found that events have not >>>> been strictly grouped into windows of 10s duration, i.e., some events are >>>> leaking into the adjacent window. >>>> >>>> The output is like this: >>>> >>>> Mon, 04 Jul 2016 11:11:50 CST # 1 >>>> Mon, 04 Jul 2016 11:11:50 CST # 2 >>>> # removed for brevity >>>> Mon, 04 Jul 2016 11:11:59 CST # 99 >>>> 99 events in this window >>>> Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong >>>> window >>>> Mon, 04 Jul 2016 11:12:00 CST >>>> >>>> Here is the code: >>>> >>>> import org.apache.commons.lang3.time.FastDateFormat; >>>> import org.apache.flink.api.common.functions.FoldFunction; >>>> import org.apache.flink.api.common.functions.MapFunction; >>>> import org.apache.flink.api.java.functions.KeySelector; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >>>> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >>>> import org.apache.flink.streaming.api.watermark.Watermark; >>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>> >>>> public class TimeWindow { >>>> >>>> private static class TimestampAssigner implements AssignerWithPeriodicWatermarks { >>>> private final long DELAY = 500; >>>> private long currentWatermark; >>>> >>>> @Override >>>> public Watermark getCurrentWatermark() { >>>> return new Watermark(currentWatermark); >>>> } >>>> >>>> @Override >>>> public long extractTimestamp(Long event, long l) { >>>> currentWatermark = Math.max(currentWatermark, event - DELAY); >>>> return event; >>>> } >>>> } >>>> >>>> public static void main(String[] args) throws Exception { >>>> final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z"); >>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); >>>> >>>> DataStream stream = env.addSource(new RichParallelSourceFunction() { >>>> private volatile boolean isRunning = true; >>>> >>>> @Override >>>> public void run(SourceContext sourceContext) throws Exception { >>>> while (isRunning) { >>>> sourceContext.collect(System.currentTimeMillis()); >>>> Thread.sleep(200); >>>> } >>>> >>>> sourceContext.close(); >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> isRunning = false; >>>> } >>>> }); >>>> >>>> stream >>>> .assignTimestampsAndWatermarks(new TimestampAssigner()) >>>> .keyBy(new KeySelector() { >>>> @Override >>>> public Integer getKey(Long x) throws Exception { >>>> return 0; >>>> } >>>> }) >>>> .timeWindow(Time.seconds(10)) >>>> .fold(0, new FoldFunction() { >>>> @Override >>>> public Integer fold(Integer count, Long x) throws Exception { >>>> System.out.println(formatter.format(x)); >>>> return count + 1; >>>> } >>>> }) >>>> .map(new MapFunction() { >>>> @Override >>>> public Void map(Integer count) throws Exception { >>>> System.out.println(count + " events in this window"); >>>> return null; >>>> } >>>> }); >>>> >>>> env.execute(); >>>> } >>>> } >>>> >>>> >>>> It doesn't always happen, but if you run the program long enough it can >>>> be observed for sure. >>>> Adjusting the DELAY value of watermark generation does not change the >>>> behavior. >>>> >>> >> --001a113d0c9ca607310536da4d6b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The output is the timestamps of events in string. (For con= venience, the payload of each event is exactly the timestamp of it.) As soo= n as the folding of a time window is finished, the code will print "# = events in this window" indicating the end of the window.

The 10= s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in= the example above, the events at 19:10:50, which belong to [19:10:50, 19:1= 0:59] window were mistakenly put in the [19:10:40, 19:10:49] one.
=

On 4 July 2016 at= 21:41, Aljoscha Krettek <aljoscha@apache.org> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Could you please elaborate a= bit on what exactly the output means and how you derive that events are le= aking into the previous window?

On Mon, 4 Jul 2016 at 13:= 20 Yukun Guo <gyk= .net@gmail.com> wrote:
Thanks for the information. Strange enough, after I set the tim= e characteristic to EventTime, the events are leaking into the previous win= dow:

...
Mon, 04 = Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2= 016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10= :50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul= 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:= 10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST=


On 4 July 2016 at 16:15, Aljoscha Krettek <aljoscha@apache.org&g= t; wrote:
Hi,I think it should be as simple as setting event time as the stream time ch= aracteristic:

env.setStreamTimeCharacteristic(Time= Characteristic.EventTime)

The problem is that=C2= =A0.timeWindow(Time.seconds(10)) will use processing time if you don't = specify a time characteristic. You can enforce using an event-time window u= sing this:

stream.window(EventTimeTumblingWindows.= of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk.net@gmail.com> wrote:
Hi,

I wrot= e a program which constructs a WindowedStream to compute periodic data stat= istics every 10 seconds. However, I found that events have not been strictl= y grouped into windows of 10s duration, i.e., some events are leaking into = the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST=C2=A0 # 1Mon, 04 Jul 2016 11:11:50 CST=C2=A0 # 2
# removed for brevity
Mon, 0= 4 Jul 2016 11:11:59 CST=C2=A0 # 99
99 events in this window
Mon, 04 J= ul 2016 11:11:59 CST=C2=A0 # This event has been put in the wrong window

Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import org.apache.flink.streaming.api.functions.AssignerWithPeri= odicWatermarks;
import = org.apache.flink.streaming.api.function= s.source.RichParallelSourceFunction;
import org.apache.flink.s= treaming.api.watermark.Watermark;
import org.apache.flink.stre= aming.api.windowing.time.Time;

public class TimeWindow
{

private static class TimestampAssigner implements AssignerWithPe= riodicWatermarks<Long&= gt; {
private fina= l long DELAY =3D 500; private long currentWat= ermark;

@Override
public Watermark getCurrentWatermark() {
= return new Waterm= ark(currentWatermark); }

@Override
publi= c long extractTimestamp(<= span style=3D"color:rgb(166,83,0)">Long
event, long l
) {
curre= ntWatermark =3D Math.max(currentWatermark, event - DE= LAY);
return = event;
}
}

public static void main(String
[] args) throws Exception {
final FastDateFormat formatter =3D FastDateFormat.get= Instance("EEE, dd MM= M yyyy HH:mm:ss z");
= StreamExecutionEnvironment = env =3D StreamExecutionEnvironment.createLocalEnvironment()
;

DataStream&= lt;Long> stream =3D env.addSource(new RichParalle= lSourceFunction<Long&g= t;() {
private= volatile boolean isRunning =3D t= rue;

@Override=
public void run(SourceContext&= lt;Long> sourceContext) = throws Exception {
=
while = (isRunning) {
sourceContext.c= ollect(System.currentTimeMillis());
Thread.sleep(20= 0);
}

sourceContext.close();
}

@Override
= public void cancel() {
isRunning =3D false;
}
=
});

= stream
.a= ssignTimestampsAndWatermarks(new = TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
= @Override
public I= nteger getKey(Long x= ) throws Exception {
return 0;
= }
})
.= timeWindow(Time.seconds(10))
= .fold(0, new= FoldFunction<Long, Integer>() {
@Override
= public = Integer fold(Integer = count, Long x) t= hrows Exception {
= System.out.println(formatter.format(x));
return count + 1
;
}
})
= .map(new MapF= unction<Integer, Void
>() {
= @Override
public Void <= span style=3D"color:rgb(68,68,68)">map(
Integer count)
throws Exception {
System= .out.<= span style=3D"color:rgb(68,68,68)">println(count + " events in this window");
return null;
}
= });

= env.execute();
}
}

It doesn't always happen, but if yo= u run the program long enough it can be observed for sure.
Adjusting the= DELAY value of watermark generation does not change the behavior.


--001a113d0c9ca607310536da4d6b--