flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Balaji Rajagopalan <balaji.rajagopa...@olacabs.com>
Subject Re: does reduce function has a bug
Date Fri, 25 Mar 2016 07:05:45 GMT
Never mind Till figured out a way, instead of doing the aggregation in
reduce, I moved that logic to apply of the window function.

On Thu, Mar 24, 2016 at 11:33 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Till,
>
>   Thanks for your reply, may be I should have given more details. val
> stream = DataStream(String,Int) is already windowed.  Ideally I have all
> the data that I need in my data stream, all my trying to do is like
> HashMap[String,Int] from tuples(String,Int) , if reduce is not the best
> solution, can you please suggest another way to do the same.
>
> val source: DataStream[String] = someSource
> val stream =
> source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply
> { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=>
> mywindowfunc(x,y,z,w)}
> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
> keyedStream.print()
>
> Balaji
>
>
> On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Balaji,
>>
>> the output you see is the correct output since you're computing a
>> continuous reduce of the incoming data. Since you haven't defined a time
>> frame for your reduce computation you either would have to wait for all
>> eternity to output the final result or you output every time you've
>> generated a new reduce result  this result (which is of course partial).
>> Since the first option is not very practical, Flink emits the partial
>> reduce results.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> I have keyed input stream on DateStream(String,Int) and wrote a reduce
>>> on the keyedStream. The reduce is simple one summing up the integer values
>>> of the same key.
>>>
>>> val stream = DataStream(String,Int)
>>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
>>> keyedStream.print()
>>>
>>> class MyReduceFunction extends ReduceFunction(String,Int) {
>>>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int)
>>> = {
>>>        (in._1, in._2+in1._2)
>>>    }
>>> }
>>>
>>> Here is my sample input stream.
>>> ( "k1",1)
>>> ("k1",1)
>>> ("k2",1)
>>>
>>> I was expecting the output of the above program to return
>>> ("k1",2)
>>> ("k2",1)
>>>
>>> where as I got this,
>>> ("k1",1)
>>> ("k1",2)
>>> ("k2",1)
>>>
>>> Isn't this a incorrect output.
>>>
>>> Balaji
>>>
>>
>>
>

Mime
View raw message