flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kanak Biscuitwala <kana...@hotmail.com>
Subject RE: Multiple operations on a WindowedStream
Date Tue, 05 Apr 2016 19:00:15 GMT
This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try
to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<String> stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString<List<Tuple3<String, String, Integer>>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an AllWindowFunction
that produces List<Tuple3<String, String, Integer>>, and ToString is a MapFunction
that is just a wrapper on Object#toString().

Anything obvious that I'm doing wrong?
________________________________
> From: aljoscha@apache.org 
> Date: Fri, 1 Apr 2016 09:41:12 +0000 
> Subject: Re: Multiple operations on a WindowedStream 
> To: user@flink.apache.org 
> 
> Hi, 
> if you are using ingestion-time (or event-time) as your stream time 
> characteristic, i.e.: 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or 
> TimeCharacteristic.EventTime 
> 
> you can apply several window transforms after another and they will 
> apply the same "time window" because they work on the element 
> timestamps. What you can then do is have a window that does the 
> aggregation and then another one (that has to be global) to select the 
> top elements: 
> 
> result = input 
> .keyBy(<some key>) 
> .timeWindow(Time.minutes(1), Time.seconds(5)) 
> .sum(2) 
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding 
> window here, because the above will output a new window every 5 seconds 
> .apply(<my custom window function>) 
> 
> I hope this helps. 
> 
> Cheers, 
> Aljoscha 
> 
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan 
> <balaji.rajagopalan@olacabs.com<mailto:balaji.rajagopalan@olacabs.com>> 
> wrote: 
> I had a similar use case and ended writing the aggregation logic in the 
> apply function, could not find any better solution. 
> 
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala 
> <kanak.b@hotmail.com<mailto:kanak.b@hotmail.com>> wrote: 
> Hi, 
> 
> I would like to write something that does something like a word count, 
> and then emits only the 10 highest counts for that window. Logically, I 
> would want to do something like: 
> 
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) 
> 
> However, the window context is lost after I do the sum aggregation. Is 
> there a straightforward way to express this logic in Flink 1.0? One way 
> I can think of is to have a complex function in apply() that has state, 
> but I would like to know if there is something a little cleaner than 
> that. 
> 
> Thanks, 
> Kanak 
> 
 		 	   		  
Mime
View raw message