flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Le Xu <sharonx...@gmail.com>
Subject Re: Local combiner on each mapper in Flink
Date Thu, 26 Oct 2017 13:01:16 GMT
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientruong@gmail.com> wrote:
> 
> Hi,
> 
> For batch API, you can use GroupReduceFunction, which give you the same benefit as a
MapReduce combiner.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
> Kien
> 
> 
> On 10/26/2017 7:37 PM, Le Xu wrote:
>> Thanks guys! That makes more sense now. 
>> 
>> So does it mean once I start use a window operator, all operations on my WindowedStream
would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would
apply to all data after shuffling instead of each partition. 
>> 
>> If I understand this correctly, once I want to perform some sort of counting within
each partition for different words (such as word count), I should really avoid using keyBy
but keep some sort of counting map for each word while also keep track of the current time
stamp, inside each mapper.
>> 
>> Le
>> 
>> 
>> 
>> 
>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>>> 
>>> Hi,
>>> 
>>> in a MapReduce context, combiners are used to reduce the amount of data 1) to
shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>>> 
>>> The question is, why do you need a combiner in your use case.
>>> - To reduce the data to shuffle: You should not use a window operator to preaggregate
because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator
state. In this solution you need to implement the windowing logic yourself, i.e., group data
in window based on their timestamp. Ensure you don't run out of memory (operator state is
kept on the heap), etc. So this solution needs quite a bit of manual tuning.
>>> - To reduce the impact of skewed data: You can use a window aggregation if you
don't mind the shuffle. However, you should add an additional                   artificial
key attribute to spread out the computation of the same original key to more grouping key.
Note that Flink assigns grouping keys by hash partitioning to workers. This works well for
many distinct keys, but might cause issues in case of low key cardinality. Also note that
the state size grows and effectiveness reduces with an increasing cardinality of the artificial
key.
>>> 
>>> Hope this helps,
>>> Fabian
>>> 
>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <mailto:ykt836@gmail.com>>:
>>> Do you mean you want to keep the origin window as well as doing some combine
operations inside window in the same time?
>>> What kind of data do you expect the following operator will receive?
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <mailto:sharonxu65@gmail.com>>
wrote:
>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering,
is there any way for me to preserve the window after aggregation. More specifically, originally
i have something like:
>>> 
>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream
= dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>> 
>>> and then for the reducer I can do:
>>>  
>>> windowStream.apply(...) 
>>> 
>>> and expect the window information is preserved.
>>> 
>>> If I were to do use aggregate on window stream, I would end up with something
like:
>>> 
>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>> 				(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
Long>>() {
>>>                     @Override
>>>                     public Accumulator createAccumulator() {
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public void add(Tuple2<String, Long> stringLong, Accumulator
o) 					{
>>> 
>>>                     }
>>> 
>>>                     @Override
>>>                     public Tuple2<String, Long> getResult(Accumulator o)
{
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public Accumulator merge(Accumulator o, Accumulator acc1)
{
>>>                         return null;
>>>                     }
>>>                 });
>>> 
>>> Because it looks like aggregate would only transfer WindowedStream to a DataStream.
But for a global aggregation phase (a reducer), should I extract the window again?
>>> 
>>> 
>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>> 
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <mailto:ykt836@gmail.com>>
wrote:
>>> I think you can use WindowedStream.aggreate
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <mailto:sharonxu65@gmail.com>>
wrote:
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation
of combiner in DataStream (to use after keyBy and windowing).
>>> 
>>> Thanks again!
>>> 
>>> Le
>>> 
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <mailto:ykt836@gmail.com>>
wrote:
>>> Hi,
>>> 
>>> The document you are looking at is pretty old, you can check the newest version
here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>> 
>>> Regarding to your question, you can use combineGroup 
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <mailto:sharonxu65@gmail.com>>
wrote:
>>> Hello!
>>> 
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to each
mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
but couldn't find anything that matches.
>>> 
>>> 
>>> Thanks!
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 


Mime
View raw message