flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: question regarding windowed stream
Date Sat, 07 May 2016 07:58:18 GMT
Hi,
yes, the input does indeed play a role. If not elements are incoming then
there will also be no window.

Cheers,
Aljoscha

On Fri, 6 May 2016 at 12:18 Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I have a requirement where I want to do aggregation on one data stream
> every 5 minutes, a different data stream every 1 minute. I wrote a example
> code to test this out but the behavior is different from what I expected ,
> I expected the window2 to be called 5 times, and window 1 to called once ,
> but in a 5 minute interval the window 1 is called once and window2 is
> called only once, have I understood the windowed function incorrectly, does
> the input play a role in no of times a window apply is called. I use the nc
> command to write to the socket port 9999 and 9998.
>
>
>
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
>
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.windowing.windows.Window
>
>
>   object WindowWordCount {
>     def main(args: Array[String]) {
>
>       val env = StreamExecutionEnvironment.getExecutionEnvironment
>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>       val text = env.socketTextStream("localhost", 9999)
>       val text1 = env.socketTextStream("localhost", 9998)
>       val stream:DataStream[String] = text.flatMap { _.toLowerCase.split("\\W+") filter
{ _.nonEmpty } }
>       val count = stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5))).apply
{ new MyAllWindowFunction }
>
>
>       count.print
>
>       val counts1 = text1.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty }
}
>         .windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).apply { new MyAllWindowFunction2
}
>
>       counts1.print
>
>       env.execute("Window Stream WordCount")
>     }
>
>     class MyAllWindowFunction extends AllWindowFunction[String,String,TimeWindow]
>     {
>       def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]):
Unit =
>       {
>         System.out.println("timed window1 is called")
>       }
>     }
>
>     class MyAllWindowFunction2 extends AllWindowFunction[String,String,TimeWindow]
>     {
>       def apply(window : TimeWindow, input : scala.Iterable[String], out : org.apache.flink.util.Collector[String]):
Unit =
>       {
>         System.out.println("timed window2 is called")
>       }
>     }
>   }
>
>
> The output was:
>
> timed window2 is called
> timed window1 is called
>
>

Mime
View raw message