flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: does reduce function has a bug
Date Thu, 24 Mar 2016 17:51:34 GMT
Hi Balaji,

the output you see is the correct output since you're computing a
continuous reduce of the incoming data. Since you haven't defined a time
frame for your reduce computation you either would have to wait for all
eternity to output the final result or you output every time you've
generated a new reduce result  this result (which is of course partial).
Since the first option is not very practical, Flink emits the partial
reduce results.

Cheers,
Till

On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I have keyed input stream on DateStream(String,Int) and wrote a reduce on
> the keyedStream. The reduce is simple one summing up the integer values of
> the same key.
>
> val stream = DataStream(String,Int)
> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
> keyedStream.print()
>
> class MyReduceFunction extends ReduceFunction(String,Int) {
>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) =
> {
>        (in._1, in._2+in1._2)
>    }
> }
>
> Here is my sample input stream.
> ( "k1",1)
> ("k1",1)
> ("k2",1)
>
> I was expecting the output of the above program to return
> ("k1",2)
> ("k2",1)
>
> where as I got this,
> ("k1",1)
> ("k1",2)
> ("k2",1)
>
> Isn't this a incorrect output.
>
> Balaji
>

Mime
View raw message