flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed
Date Tue, 13 Dec 2016 17:17:17 GMT
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed
how the timer service is being cleaned up and now the watermark timers are
not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <y.marzougui@mindlytix.com>
wrote:

> Hi Aljoscha,
>
> Please excuse me for the late response; I've been busy for the whole
> previous week.
> I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark)
> to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
> watremark is printed at the end of the stream with the value WM: Watermark
> @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
> printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
> https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9
> .
>
> I uploaded the dataset I'm using as an input here :
> https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing
>  ,the first column corresponds to the timestamp.
>
> You can find the code below. Thanks you for your help.
>
> import com.opencsv.CSVParser;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> import java.util.*;
>
> /**
>  * Created by ymarzougui on 11/1/2016.
>  */
> public class SortedSessionsAssigner {
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         DataStream<Tuple3<Long,String,String>> waterMarked =
> env.readTextFile("file:///E:\\data\\anonymized.csv")
>                 .flatMap(new RichFlatMapFunction<String,
> Tuple3<Long,String,String>>() {
>                     public CSVParser csvParser;
>
>                     @Override
>                     public void open(Configuration config) {
>                         csvParser = new CSVParser(',', '"');
>                     }
>
>                     @Override
>                     public void flatMap(String in,
> Collector<Tuple3<Long,String,String>> clctr) throws Exception {
>                         String[] result = csvParser.parseLine(in);
>                         clctr.collect(Tuple3.of(Long.parseLong(result[0]),
> result[1], result[2]));
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
>                     @Override
>                     public long
> extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
>                         return tuple3.f0;
>                     }
>                 });
>
>         DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions
=
> waterMarked
>                 .keyBy(1)
>                 .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>                 .apply(new
> WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>,
> Long>, Tuple, TimeWindow>() {
>
>                     @Override
>                     public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple3<Long, String, String>> iterable,
> Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws
> Exception {
>                         TreeMap<String,Double> treeMap = new
> TreeMap<String, Double>();
>                         Long session_count = 0L;
>                         for (Tuple3<Long, String, String> tuple3 :
> iterable){
>                             treeMap.put(tuple3.f2,
> treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
>                             session_count += 1;
>                         }
>                         collector.collect(Tuple2.of(treeMap,
> session_count));
>
>                     }
>                 }).setParallelism(8);
>
>         waterMarked.transform("WatermarkDebugger", waterMarked.getType(),
> new WatermarkDebugger<Tuple3<Long, String, String>>());
>
>         //sessions.writeAsCsv("file:///E:\\data\\sessions.csv",
> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>
>         env.execute("Sorted Sessions Assigner");
>
>     }
>
>     public static class WatermarkDebugger<T>
>             extends AbstractStreamOperator<T> implements
> OneInputStreamOperator<T, T> {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void processElement(StreamRecord<T> element) throws
> Exception {
>             System.out.println("ELEMENT: " + element);
>             output.collect(element);
>         }
>
>         @Override
>         public void processWatermark(Watermark mark) throws Exception {
>             // 1.2-snapshot
>             super.processWatermark(mark);
>             // 1.1-snapshot
>             //super.output.emitWatermark(mark);
>             System.out.println("WM: " + mark);
>         }
>     }
>
> }
>
> Best,
> Yassine
>
> 2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:
>
> Hi,
> could you please try adding this custom watermark debugger to see what's
> going on with the element timestamps and watermarks:
>
> public static class WatermarkDebugger<T>
>         extends AbstractStreamOperator<T> implements
> OneInputStreamOperator<T, T> {
>     private static final long serialVersionUID = 1L;
>
>     @Override
>     public void processElement(StreamRecord<T> element) throws Exception {
>         System.out.println("ELEMENT: " + element);
>         output.collect(element);
>     }
>
>     @Override
>     public void processWatermark(Watermark mark) throws Exception {
>         super.processWatermark(mark);
>         System.out.println("WM: " + mark);
>     }
> }
>
> you can use it like this:
> input.transform("WatermarkDebugger", input.getType(), new
> WatermarkDebugger<Tuple2<String, Integer>>());
>
> That should give us something to work with.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetzger@apache.org> wrote:
>
> I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
> best overview over the changes to the window operator between 1.1. and 1.2.
>
> On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
> y.marzougui@mindlytix.com> wrote:
>
> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>
> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
>
> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzougui@mindlytix.com> wrote:
>
> Hi all,
>
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
>
> I am using a code similar to the follwoing:
>
> env.setParallelism(1);
>
> DataStream<T> sessions = env
>     .readTextFile()
>     .flatMap()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>     .keyBy(1)
>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>     .apply().setParallelism(32)
>
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>
> Best,
> Yassine
>
>
>
>
>
>
>

Mime
View raw message