flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: How to sort tuples in DataStream
Date Tue, 12 Jan 2016 10:23:03 GMT
Hi!

Since a stream is infinite, you cannot simply sort it (Flink does not
follow the mini batch model). You can only sort in windows.

I assume you key by word and sum up the counts.  Since you want to get the
most frequent words, you would need to sort across keys, which you can do
in a windowAll() function. Since you want a global sort, this will end up
being a non-parallel step.

A more efficient variant is to have a bounded (N) max heap in the
windowAll() function that you update with new elements and emit at the end.
A fold() function should allow you to implement that.

BTW: It is probably also more efficient to parse the Strings into numbers
once at the beginning of the program.

Stephan

On Mon, Jan 11, 2016 at 7:41 PM, Saiph Kappa <saiph.kappa@gmail.com> wrote:

> Hi,
>
> I'm trying to do a simple application in Flink Stream to count the top N
> words on a window-basis, but I don't know how to sort the words by their
> frequency in Flink.
>
> In spark streaming, I would do something like this:
> «
> val isAscending = true
> stream.reduceByKeyAndWindow(reduceFunc, Seconds(10), Seconds
> (10)).transform(_.sortByKey(isAscending)).map(_._2)
> »
>
> How can I do it in Flink Stream?
>
> This is what I have so far:
> «
> val reduceFunc = (a: String, b: String) => {
>
>   val aElems = a.split(Separator)
>   val bElems = b.split(Separator)
>   val result = a(params.aggParams.get.head.aggIndex).toInt + b(params.aggParams.get.head.aggIndex).toInt
>   result.toString
> }
>
> stream.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(10)).reduce(reduceFunc)
> »
>
>
> My stream is just a series of strings like "field1|field2|field3|..."
>
> Thanks.
>

Mime
View raw message