Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9B3D0195AB for ; Thu, 21 Apr 2016 12:54:46 +0000 (UTC) Received: (qmail 50353 invoked by uid 500); 21 Apr 2016 12:54:46 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 50269 invoked by uid 500); 21 Apr 2016 12:54:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 50255 invoked by uid 99); 21 Apr 2016 12:54:46 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Apr 2016 12:54:46 +0000 Received: from mail-lf0-f48.google.com (mail-lf0-f48.google.com [209.85.215.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id C11041A0052 for ; Thu, 21 Apr 2016 12:54:45 +0000 (UTC) Received: by mail-lf0-f48.google.com with SMTP id c126so60066104lfb.2 for ; Thu, 21 Apr 2016 05:54:45 -0700 (PDT) X-Gm-Message-State: AOPr4FXQo6Hz5lNFDwDP6HVL6AJQyhB7JUSCxpYuocD+VooTtNBnOvizx3iuvbpPS7ptPb7B/x2VZFDWUzKRKw== X-Received: by 10.25.145.136 with SMTP id t130mr6399897lfd.4.1461243283793; Thu, 21 Apr 2016 05:54:43 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Thu, 21 Apr 2016 12:54:33 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Count windows missing last elements? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114026d6dc6f270530fe36ea --001a114026d6dc6f270530fe36ea Content-Type: text/plain; charset=UTF-8 People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted. You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons. Cheers, Aljoscha On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin wrote: > Thanks, > > I wonder wouldn't it be good to have a built-in such functionality. At > least when incoming stream is finished - flush remaining elements. > > On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek > wrote: > >> Hi, >> yes, you can achieve this by writing a custom Trigger that can trigger >> both on the count or after a long-enough timeout. It would be a combination >> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you >> could look to those to get started. >> >> Cheers, >> Aljoscha >> >> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin wrote: >> >>> I have a pretty big but final stream and I need to be able to window it >>> by number of elements. >>> In this case from my observations flink can 'skip' the latest chunk of >>> data if it has lower amount of elements than window size: >>> >>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStreamSource source = env.addSource(new SourceFunction() { >>> >>> @Override >>> public void run(SourceContext ctx) throws Exception { >>> LongStream.range(0, 35).forEach(ctx::collect); >>> } >>> >>> @Override >>> public void cancel() { >>> >>> } >>> }); >>> >>> source.countWindowAll(10).apply(new AllWindowFunction() { >>> @Override >>> public void apply(GlobalWindow window, Iterable values, Collector out) throws Exception { >>> System.out.println(Joiner.on(',').join(values)); >>> } >>> }).print(); >>> >>> env.execute("yoyoyo"); >>> >>> >>> Output: >>> 0,1,2,3,4,5,6,7,8,9 >>> 10,11,12,13,14,15,16,17,18,19 >>> 20,21,22,23,24,25,26,27,28,29 >>> >>> I.e. elements from 10 to 35 are not being processed. >>> >>> Does it make sense to have: count OR timeout window which will evict new >>> window when number of elements reach a threshold OR collecting timeout >>> occurs? >>> >> > --001a114026d6dc6f270530fe36ea Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
People have wondered about that a few times, yes. My opini= on is that a stream is potentially infinite and processing only stops for a= nomalous reasons: when the job crashes, when stopping a job to later redepl= oy it. In those cases you would not want to flush out your data but keep th= em and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behav= es like the count trigger but also fires when receiving a Long.MAX_VALUE wa= termark. A watermark of Long.MAX_VALUE signifies that a source has stopped = processing for natural reasons.

Cheers,
= Aljoscha

On Thu,= 21 Apr 2016 at 14:42 Kostya Kulagin <kkulagin@gmail.com> wrote:
Thanks,

I wonder wouldn't it be g= ood to have a built-in such functionality. At least when incoming stream is= finished - flush remaining elements.
<= br>
On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Kre= ttek <aljoscha@apache.org> wrote:
Hi,
yes, you can achieve this by writing a c= ustom Trigger that can trigger both on the count or after a long-enough tim= eout. It would be a combination of CountTrigger and EventTimeTrigger (or Pr= ocessingTimeTrigger) so you could look to those to get started.
<= br>
Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kula= gin <kkulagin@gm= ail.com> wrote:
I have a pretty big but final stream and I need to be able t= o window it by number of elements.
In this case from my observatio= ns flink can 'skip' the latest chunk of data if it has lower amount= of elements than window size:

    StreamExecutionEnvironment env =3D StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource= <Long> source =3D env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
Lon= gStream.range(0, 35).forE= ach(ctx::collect);
}

@Override
= public void canc= el() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindo= w>() {
@Override
public void apply(GlobalWindow window, I= terable<Long> values, Collector<Long> out) throws Exception {
System= .o= ut.println(Joiner.on(',').join(valu= es));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4= ,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,2= 9

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will e= vict new window when number of elements reach a threshold OR collecting tim= eout occurs?

--001a114026d6dc6f270530fe36ea--