Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C933B200BD7 for ; Sun, 11 Dec 2016 22:10:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C7A04160B20; Sun, 11 Dec 2016 21:10:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 511F9160B04 for ; Sun, 11 Dec 2016 22:10:02 +0100 (CET) Received: (qmail 23703 invoked by uid 500); 11 Dec 2016 21:10:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 23693 invoked by uid 99); 11 Dec 2016 21:10:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Dec 2016 21:10:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D0E81C1EED for ; Sun, 11 Dec 2016 21:10:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.48 X-Spam-Level: ** X-Spam-Status: No, score=2.48 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=mindlytix-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id t6TVpYAFLkoo for ; Sun, 11 Dec 2016 21:09:58 +0000 (UTC) Received: from mail-qk0-f169.google.com (mail-qk0-f169.google.com [209.85.220.169]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 7E57E5F5F8 for ; Sun, 11 Dec 2016 21:09:58 +0000 (UTC) Received: by mail-qk0-f169.google.com with SMTP id n21so66292035qka.3 for ; Sun, 11 Dec 2016 13:09:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=mindlytix-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=ixXyieMNaeQ9L9qDVrJTOJCWp02qr0Fe1fIFkfTDNLY=; b=SBIaYVHGXz1WQLFerPFNk+3Onb0XHxosaNdoe1xB81N4yU0coJuRf0q+u9SUIVVj+v CUWnG3dUnumIzfE9Cr6Jo9hkoc8vmbahpDF74GfWT+B1110t9S+PnYzn+53oZ/JfAcM6 P0rs1gRXqJIMs+epjcZqE4kDnwZCJlT50OGWMo25JxDS5OtMyuKyUpwD5zPHUrkvMOTI EaDGkUNT9AMc5YH3T2dkdewOXjQJFhLJ61OMxbgQw3KCt9g9AREVGi/sKvY+5Ibry5ot bTn89Dn4YiLTTu0tGQaa66f0BItNyKx0FPv+/sSlbYEk97o5kS96pE3yFDLUFJvRhdpq I04w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=ixXyieMNaeQ9L9qDVrJTOJCWp02qr0Fe1fIFkfTDNLY=; b=jpdAg4kk3ny1t4Q/zD022Q++IF25pwe0zoYHGG8UvOLYPgAmEMJacbqd9x1gh0vsji LV7O2b21NF8rngxD4moQyhQr+A3i2bJugF5EV7GTP4bpDnjZbq8bhvwEO3B//Y1EoLDR 6Im5uaiaA43qqtJyb3BHVZopJdkt6D7UQg2E3IhWuonmzjsm1cV/kHiLgwszy81ipOpt J6iOSdbbL/6tPHk1VdGRPTFcRf3Dw4zbN8ZAG8w0RpKp5INU8omHuUolbtLFBnKDDNAz skTmPgwmO77/MdJ6hnQIm2EXw1IU/0wU7HtpHf+KZ6oXpkmfvpNC8xHGO63mzc2UMMDG eSog== X-Gm-Message-State: AKaTC01kdB2vruUxVaVVkueIS4OSvcCOpraaceGLu1irv7foM4nNJiW3IziMkAsJB3kn+/pgssoSepG8qNdb1A== X-Received: by 10.55.6.12 with SMTP id 12mr77015077qkg.182.1481490593108; Sun, 11 Dec 2016 13:09:53 -0800 (PST) MIME-Version: 1.0 Received: by 10.237.56.74 with HTTP; Sun, 11 Dec 2016 13:09:52 -0800 (PST) In-Reply-To: References: From: Yassine MARZOUGUI Date: Sun, 11 Dec 2016 22:09:52 +0100 Message-ID: Subject: Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed To: user@flink.apache.org Cc: Robert Metzger , Kostas Kloudas Content-Type: multipart/alternative; boundary=001a114d8ca48a8d220543686868 archived-at: Sun, 11 Dec 2016 21:10:04 -0000 --001a114d8ca48a8d220543686868 Content-Type: text/plain; charset=UTF-8 Hi Aljoscha, Please excuse me for the late response; I've been busy for the whole previous week. I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f495 34e3a210e9. I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp. You can find the code below. Thanks you for your help. import com.opencsv.CSVParser; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; import java.util.*; /** * Created by ymarzougui on 11/1/2016. */ public class SortedSessionsAssigner { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv") .flatMap(new RichFlatMapFunction>() { public CSVParser csvParser; @Override public void open(Configuration config) { csvParser = new CSVParser(',', '"'); } @Override public void flatMap(String in, Collector> clctr) throws Exception { String[] result = csvParser.parseLine(in); clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2])); } }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { @Override public long extractAscendingTimestamp(Tuple3 tuple3) { return tuple3.f0; } }); DataStream, Long>> sessions = waterMarked .keyBy(1) .window(EventTimeSessionWindows.withGap(Time.minutes(5))) .apply(new WindowFunction,Tuple2, Long>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable> iterable, Collector, Long>> collector) throws Exception { TreeMap treeMap = new TreeMap(); Long session_count = 0L; for (Tuple3 tuple3 : iterable){ treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1); session_count += 1; } collector.collect(Tuple2.of(treeMap, session_count)); } }).setParallelism(8); waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger>()); //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("Sorted Sessions Assigner"); } public static class WatermarkDebugger extends AbstractStreamOperator implements OneInputStreamOperator { private static final long serialVersionUID = 1L; @Override public void processElement(StreamRecord element) throws Exception { System.out.println("ELEMENT: " + element); output.collect(element); } @Override public void processWatermark(Watermark mark) throws Exception { // 1.2-snapshot super.processWatermark(mark); // 1.1-snapshot //super.output.emitWatermark(mark); System.out.println("WM: " + mark); } } } Best, Yassine 2016-12-06 5:57 GMT+01:00 Aljoscha Krettek : > Hi, > could you please try adding this custom watermark debugger to see what's > going on with the element timestamps and watermarks: > > public static class WatermarkDebugger > extends AbstractStreamOperator implements > OneInputStreamOperator { > private static final long serialVersionUID = 1L; > > @Override > public void processElement(StreamRecord element) throws Exception { > System.out.println("ELEMENT: " + element); > output.collect(element); > } > > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("WM: " + mark); > } > } > > you can use it like this: > input.transform("WatermarkDebugger", input.getType(), new > WatermarkDebugger>()); > > That should give us something to work with. > > Cheers, > Aljoscha > > On Mon, 5 Dec 2016 at 18:54 Robert Metzger wrote: > > I'll add Aljoscha and Kostas Kloudas to the conversation. They have the > best overview over the changes to the window operator between 1.1. and 1.2. > > On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI < > y.marzougui@mindlytix.com> wrote: > > I forgot to mention : the watermark extractor is the one included in Flink > API. > > 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI : > > Hi robert, > > Yes, I am using the same code, just swithcing the version in pom.xml to > 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at > the time of the question)). Here is the watermark assignment : > > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() > { > @Override > public long extractAscendingTimestamp(Tuple3 > tuple3) { > return tuple3.f0; > } > }) > > Best, > Yassine > > 2016-12-05 11:24 GMT+01:00 Robert Metzger : > > Hi Yassine, > are you sure your watermark extractor is the same between the two > versions. It sounds a bit like the watermarks for the 1.2 code are not > generated correctly. > > Regards, > Robert > > > On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI < > y.marzougui@mindlytix.com> wrote: > > Hi all, > > With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows > boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing > in memory and the windows results are not emitted until the whole stream is > processed. Is this a temporary behaviour due to the developments in > 1.2-SNAPSHOT, or a bug? > > I am using a code similar to the follwoing: > > env.setParallelism(1); > > DataStream sessions = env > .readTextFile() > .flatMap() > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>()) > .keyBy(1) > .window(EventTimeSessionWindows.withGap(Time.minutes(5))) > .apply().setParallelism(32) > > sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv(); > sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv(); > > Best, > Yassine > > > > > > --001a114d8ca48a8d220543686868 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Aljoscha,
Please excuse me for the late response; I've been bus= y for the whole previous week.
I used the custom watermark debugger (with 1.1, I changed=C2=A0super.processWatermark(mark) to=C2=A0super.output.emitWatermark(mark)), surprisingly=C2=A0with 1.2, only= one watremark is printed at the end of the stream with the value WM: Water= mark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks a= re printed periodically. I am =C2=A0using the following revision of 1.2-SNA= PSHOT :=C2=A0https://github.com/apach= e/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9.=

I uploaded the dataset I'm using as an inpu= t here :=C2=A0https://drive.google.com/file/d/0BzERCAJnxXo= cNGpMTGMzX09id1U/view?usp=3Dsharing=C2=A0,the first column corresponds to the timestamp.

You can find the code below. Thanks you for = your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFu= nction;
import org.apa= che.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
<= font face=3D"monospace, monospace">import org.apache.flink.api.java.tuple.T= uple3;
import org.apac= he.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink= .streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExe= cutionEnvironment;
imp= ort org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampE= xtractor;
import org.a= pache.flink.streaming.api.functions.windowing.WindowFunction;
<= div>import org.apache.flink.streaming.a= pi.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStre= amOperator;
import org= .apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.= assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.s= treaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord= .StreamRecord;
import = org.apache.flink.util.Collector;
import java.util.*;

/**
=C2=A0* Created by ymarzou= gui on 11/1/2016.
=C2= =A0*/
public class Sor= tedSessionsAssigner {
= =C2=A0 =C2=A0 public static void main(String[] args) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 final StreamExecutionEnvironment env =3D StreamExecutionEnvironment.get= ExecutionEnvironment();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.setStreamTim= eCharacteristic(TimeCharacteristic.EventTime);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 DataStream<Tuple3<Long,String,St= ring>> waterMarked =3D env.readTextFile("file:///E:\\data\\anony= mized.csv")
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap(new RichFlatM= apFunction<String, Tuple3<Long,String,String>>() {
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public CSVParser csvParser;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 public void open(Configuration config) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 csvParser =3D new CSVParser(',', &#= 39;"');
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }=

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 public void flatMap(String in, Collector<Tuple3<Long,String,S= tring>> clctr) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 String[] result =3D csvParser.parseLine(in);
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 clctr.collect(T= uple3.of(Long.parseLong(result[0]), result[1], result[2]));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 })
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .assignTimestampsAndWatermarks(new Ascen= dingTimestampExtractor<Tuple3<Long,String,String>>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 public long extractAscendingTimestamp(Tuple3<Lo= ng,String,String> tuple3) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 return tuple3.f0;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 }
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 });

=C2=A0 =C2=A0 =C2=A0 =C2=A0 DataStream<Tuple2<TreeMap&l= t;String, Double>, Long>> sessions =3D waterMarked
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 .keyBy(1)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .window(Ev= entTimeSessionWindows.withGap(Time.minutes(5)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2= <TreeMap<String, Double>, Long>, Tuple, TimeWindow>() {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 public void apply(Tuple tuple, TimeWindow timeWindow, Iterable&l= t;Tuple3<Long, String, String>> iterable, Collector<Tuple2<T= reeMap<String, Double>, Long>> collector) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 TreeMap<Stri= ng,Double> treeMap =3D new TreeMap<String, Double>();
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Long session_count =3D 0L= ;
=C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 for (Tuple3= <Long, String, String> tuple3 : iterable){
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 treeMap.put(tuple3.f2, tr= eeMap.getOrDefault(tuple3.f2, 0.0) + 1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 session_count +=3D 1;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 collector.collect(Tuple2.of(treeM= ap, session_count));
<= br>
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 }).setParallelism(8);

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 waterMarked.transform("WatermarkDebugger"= ;, waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, = String>>());
=C2=A0 =C2=A0 =C2=A0= =C2=A0 //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", F= ileSystem.WriteMode.OVERWRITE).setParallelism(1);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.execute("Sorted Sessions As= signer");

=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 public static class WatermarkDebu= gger<T>
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 extends AbstractStreamOperator<T> = implements OneInputStreamOperator<T, T> {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final = long serialVersionUID =3D 1L;

=C2=A0 = =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 public void processElement(StreamRecor= d<T> element) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 System.out.print= ln("ELEMENT: " + element);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 output.collect(ele= ment);
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 @Override
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 public void processWatermark(Watermark mark) throw= s Exception {
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // 1.2-snapshot
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 supe= r.processWatermark(mark);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // 1.1-snapshot
<= div>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 //super.output.emitWatermark(mark);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 System.out.p= rintln("WM: " + mark);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }

}

Best= ,
Yassine
<= /div>

2016-12-06 5= :57 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Hi,
could you please try adding this custom watermark debugger to see what&= #39;s going on with the element timestamps and watermarks:

publi= c static class WatermarkDebugger<T>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 extends AbstractS= treamOperator<T> implements OneInputStreamOperator<T, T> {
=C2=A0 =C2=A0 pr= ivate static final long serialVersionUID =3D 1L;

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public void process= Element(StreamRecord<T> element) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 S= ystem.out.println("ELEMENT: " + element);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 output.= collect(element);
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public void processWatermark(Watermark mark) t= hrows Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 super.processWatermark(mark);
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 System.out.println("WM: " + mark);
=C2=A0 =C2=A0 }
}

you can use it like this:
input.transform("= WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2&l= t;String, Integer>>());

That should give us something to = work with.
<= br>
Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetzger@apache.org> wrote:
I'll add = Aljoscha and Kostas Kloudas to the conversation. They have the best overvie= w over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <y.= marzougui@mindlytix.com> wrote:
I forgot to men= tion : the watermark extractor is the one included in Flink API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.ma= rzougui@mindlytix.com>:
Hi robert,

Yes= , I am using the same code, just swithcing the version in pom.xml to 1.2-SN= APSHOT and the cluster binaries to the compiled lastest master (at the time= of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(n= ew AscendingTimestampExtractor<Tuple3<Long,String,String>>= () {
=C2=A0 = =C2=A0 @Override
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 public long extractAscendingTimestamp(Tu= ple3<Long,String,String> tuple3) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return = tuple3.f0;
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzg= er <rmetzger@apache.org>:
Hi Yassin= e,
are you sure your watermark= extractor is the same between the two versions. It sounds a bit like the w= atermarks for the 1.2 code are not generated correctly.

Regards,
Robert


On Sat, Dec 3= , 2016 at 9:01 AM, Yassine MARZOUGUI <y.marzougui@mindlytix= .com> wrote:
Hi a= ll,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon a= s the windows boundaries are detected, but with 1.2-SNAPDHOT the state keep= s increasing in memory and the windows results are not emitted until the wh= ole stream is processed. Is this a temporary behaviour due to the developme= nts in 1.2-SNAPSHOT, or a bug?

I am using a code similar to the follw= oing:

env.setParalle= lism(1);

DataStream<T> sessions =3D env
<= div class=3D"m_4743388530341711817gmail_msg">=C2=A0 =C2=A0 .readTextFile(= )
=C2=A0 =C2=A0= .flatMap()
=C2= =A0 =C2=A0 .assignTimestampsAndWatermarks(new AscendingTimestampE= xtractor<>())
=C2=A0 =C2=A0 .keyBy(1)
=C2=A0 =C2=A0 .window(EventTimeSessionWindows.= withGap(Time.minutes(5)))
=C2=A0 =C2=A0 .apply().setParallelism(32) =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0
sessions= .flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();<= /font>
sessions.flatMa= p(flatMapFunction2).setParallelism(32).writeAsCsv();<= /div>

Best,
Yassine





--001a114d8ca48a8d220543686868--