flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine Marzougui <yassmar...@gmail.com>
Subject Re: how to get rid of duplicate rows group by in DataStream
Date Wed, 24 Aug 2016 19:40:48 GMT
Sorry I mistyped the code, it should be
*timeWindow**(Time.minutes(10))* instead
of *window**(Time.minutes(10)).*

On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui <yassmarzou@gmail.com>
wrote:

> Hi subash,
>
> A stream is infinite, hence it has no notion of "final" count. To get
> distinct counts you need to define a period (= a window [1] ) over which
> you count elements and emit a result, by adding a winow operator before the
> reduce.
> For example the following will emit distinct counts every 10 minutes over
> the last 10 minutes period:
>
> *stream.keyby(2)*
> *      .window(Time.minutes(10))*
> *      .reduce(new GridPointsCount())*
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/apis/streaming/windows.html
>
>
> On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <yasubash@gmail.com> wrote:
>
>> Hello Kostas,
>>
>> Sorry for late reply. But I couldn't understand how to apply split in
>> datastream, such as in below to get the distinct output stream element with
>> the count after applying group by and reduce.
>>
>> DataStream<Tuple2<String, Long>> gridWithDensity =
>> pointsWithGridCoordinates.map(new AddCountAppender())
>> .keyBy(2).reduce(*new GridPointsCount()*).map(new
>> RetrieveGridWithCount());
>> gridWithDensity.print();
>>
>> Current Output:
>>       Required Output:
>> (33330,1)
>>          (33330,3)
>> (33330,2)
>>          (00000,4)
>> (00000,1)
>> (00000,2)
>> (00000,3)
>> (33330,3)
>> (00000,4)
>>
>> public static final class GridPointsCount implements
>> ReduceFunction<Tuple4<Point, Grid, String, Long>> {
>> @Override
>> public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid,
>> String, Long> val1,
>> Tuple4<Point, Grid, String, Long> val2) {
>> return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2,
>> val1.f3 + val2.f3);
>> }
>> }
>>
>>
>> Regards,
>> Subash Basnet
>>
>> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
>> k.kloudas@data-artisans.com> wrote:
>>
>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>> eligible for Automatic Cleanup! (k.kloudas@data-artisans.com) Add
>>> cleanup rule
>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D&tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>> | More info
>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>
>>> Hi Subash,
>>>
>>> You should also split your elements in windows.
>>> If not, Flink emits an element for each incoming record.
>>> That is why you have:
>>>
>>> (1,1)
>>> (1,2)
>>> (1,3)
>>>
>>> …
>>>
>>> Kostas
>>>
>>> > On Aug 22, 2016, at 5:58 PM, subash basnet <yasubash@gmail.com> wrote:
>>> >
>>> > Hello all,
>>> >
>>> > I grouped by the input based on it's id to count the number of
>>> elements in each group.
>>> > DataStream<Tuple2<String, Long>> gridWithCount;
>>> > Upon printing the above datastream it shows with duplicate rows:
>>> > Output:
>>> > (1, 1)
>>> > (1,2)
>>> > (2,1)
>>> > (1,3)
>>> > (2,2).......
>>> >
>>> > Whereas I wanted the distinct rows with final count:
>>> > Needed Output:
>>> > (1,3)
>>> > (2,2)..
>>> >
>>> > What could be the way to achieve this.
>>> >
>>> >
>>> > Regards,
>>> > Subash Basnet
>>>
>>>
>>>
>>
>

Mime
View raw message