flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Simple streaming word count doesn't work (Scala)
Date Mon, 14 Sep 2015 09:39:55 GMT
Hi Francis,
I'm afraid this is a very strange bug that results from the interplay
between pre-aggregating (an optimization that pre-aggregates the elements
of a window as they arrive) and the window size/slide size you use. When
using some other time values it works, but with other it doesn't, again.

I'm sorry you ran into this problem. We are currently reworking the
windowing logic. So in the next release (0.10) this should be rock solid.

Cheers,
Aljoscha

On Mon, 14 Sep 2015 at 09:17 Francis Aranda <frascuchon@gmail.com> wrote:

> Testing the apache flink stream API, I found something weird with a
> simple example.
>
> This code counts the words every 5 seconds under a window of 10 seconds.
> Until the 10 first seconds, counts sound good, after that, every print
> shows a wrong count - one per word. There is something wrong in my code?
>
>
> Thanks in advance!
>
> def generateWords(ctx: SourceContext[String]) = {
>   val words = List("amigo", "brazo", "pelo")
>   while (true) {
>     Thread.sleep(300)
>     ctx.collect(words(Random.nextInt(words.length)))
>   }}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setParallelism(1)
> val stream = env.addSource(generateWords _)val windowedStream = stream.map((_, 1))
>   .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))
>
> val wordCount = windowedStream
>   .groupBy("_1")
>   .sum("_2")
>
> wordCount
>  .getDiscretizedStream()
>   .print()
>
> env.execute("sum randoms")
>
> The output is:
>
> [(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds[(pelo,9), (brazo,5), (amigo,9)] //
first 10 seconds[(brazo,1)] [(amigo,1)]
>
>
>

Mime
View raw message