flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Balaji Rajagopalan <balaji.rajagopa...@olacabs.com>
Subject Re: flink streaming - window chaining example
Date Mon, 28 Mar 2016 06:02:09 GMT
val stream:DataStream[String] = env
  .addSource(new FlinkKafkaConsumer08[String]("topic_name", new
SimpleStringSchema, prop))

val event:DataStream[SomeEventObj] = stream.map(MyMapFunction)

val tenMinute:DataStream[AggEvents] =
ridesByDeviceIdStream.timeWindowAll(Time.of(10,
TimeUnit.MINUTES).trigger

            (ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1)

val oneHour = tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger
(MyTriggerFunction)


Above is pseduo code, may have some syntax errors but is should do
what you are looking for. There is dependency on the

tenminute window and one hour window function, so one will execute
after the other.


On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor <chen.bekor@gmail.com> wrote:

> hi all!
>
> I'm just starting my way with flink and I have a design question.
>
> I'm trying to aggregate incoming events (source: kafka topic) on a 10min
> tumbling window in order to calculate the incoming events rate (total per
> minute).
>
> I would like to take this window and perform an additional window (60 min)
> in order to calculate percentiles, std deviation and some other statistics
> on that time window. finally I would like to trigger some business logic in
> case the calculation hits a certain threshold.
>
> my main challenge is - how to chain the two windows together.
>
> any help is appreciated (please send scala example code - I'm not using
> java :) for this project)
>

Mime
View raw message