flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paschek, Robert" <robert.pasc...@tu-berlin.de>
Subject AW: Performance issues with GroupBy?
Date Wed, 27 Jul 2016 09:51:00 GMT
Hi Gábor, hi Ufuk, hi Greg,

thank you for your very helpful responses!


> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

While implementing my reducers I didn’t thought, that combining is applicable, ‘cause
each Mapper will produce each key only one time. I didn’t think of the factor, that some
mappers running on the same machine and therefore will benefit from precombining before shuffling
After I implemented the combiner mentioned here
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#combinable-groupreducefunctions
the difference between the runtime of these algorithms with and without a combiner decreased.
Thank you for the hint!

On curios thing: My Reducer receives BroadcastVariables and access them in the open() Method.
When the combine() Method is called, the BroadCast Variable seems not set yet: I got an explicit
error messages within the open() method. Is this a potential bug? Using Apache Flink 1.0.3.

To avoid changing my reducers, I’am wondering, if I should implement a GroupCombineFunction
independent from the reducer instead:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#groupcombine-on-a-grouped-dataset

As far as I understand, this will work similar?

-          The following transformation - especially the groupBy() - will run first on each
local Machine:
     combinedWords = input
                 .groupBy(0)
                 .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>())


-          And then, the following transformation will shuffle the data within the 2nd groupBy()
over the network:

     output = combinedWords

          .groupBy(0);

          .reduceGroup(new GroupReduceFunction())



> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)
I have to iterate through each tuple multiple time and the final result can only be emitted
after the last tuple is processed, so I think, I can’t use a reduce.


> This could be further amplified by the blocking intermediate results, which have a very
simplistic implementation writing out many different files, which can lead to a lot of random
I/O.

Thank you for theses technical explanation. I will mentioned it in my evaluation!

> Are you able to simplify the your function input / output types? Flink aggressively serializes
the data stream and complex types such as ArrayList and BitSet will be much slower to process.
Are you able to reconstruct the lists to be groupings on elements?
For my intention, to simulate an Apache Hadoop MapReduce like behaviour, I would say that
my current implementation fits.
I will think about rewriting the code after the first Benchmarks to potential reveal advantages
of Apache Flink in comparison for Hadoop MapReduce for the algorithms I implemented.

Thanks again!
Robert


Von: Greg Hogan [mailto:code@greghogan.com]
Gesendet: Dienstag, 26. Juli 2016 18:57
An: user@flink.apache.org<mailto:user@flink.apache.org>
Betreff: Re: Performance issues with GroupBy?

Hi Robert,
Are you able to simplify the your function input / output types? Flink aggressively serializes
the data stream and complex types such as ArrayList and BitSet will be much slower to process.
Are you able to reconstruct the lists to be groupings on elements?
Greg


-----Ursprüngliche Nachricht-----
Von: Ufuk Celebi [mailto:uce@apache.org]
Gesendet: Dienstag, 26. Juli 2016 11:53
An: user@flink.apache.org<mailto:user@flink.apache.org>
Betreff: Re: Performance issues with GroupBy?



+1 to what Gavor said. The hash combine will be part of the upcoming

1.1. release, too.



This could be further amplified by the blocking intermediate results, which have a very simplistic
implementation writing out many different files, which can lead to a lot of random I/O.



– Ufuk



On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <ggab90@gmail.com<mailto:ggab90@gmail.com>>
wrote:

> Hello Robert,

>

>> Is there something I might could do to optimize the grouping?

>

> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

>

> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)

>

> Best,

> Gábor

>

>

>

> 2016-07-25 14:06 GMT+02:00 Paschek, Robert <robert.paschek@tu-berlin.de<mailto:robert.paschek@tu-berlin.de>>:

>> Hi Mailing List,

>>

>>

>>

>> i actually do some benchmarks with different algorithms. The System

>> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1

>> cluster, if somebody from the TU Berlin read this : - ) – and to

>> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

>>

>>

>>

>> It is suspicious, that three of the six algorithms had a big gap in

>> runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally

>> the algorithms in the “upper” group using a groupBy transformation

>> and the algorithms in the “lower” group don’t use groupBy.

>>

>> I attached the plot for better visualization.

>>

>>

>>

>> I also checked the logs, especially the time, when the mappers

>> finishing and the reducers start _iterating_ - they hardened my speculation.

>>

>>

>>

>> So my question is, if it is “normal”, that grouping is so

>> cost-intensive that – in my case – the runtime increases by 4 times?

>>

>> I have data from the same experiments running on a 13 nodes cluster

>> with 26 cores with Apache Hadoop MapReduce, where the gap is still

>> present, but smaller (50s vs 57s or 55s vs 65s).

>>

>>

>>

>> Is there something I might could do to optimize the grouping? Some

>> codesnipplets:

>>

>>

>>

>> The Job:

>> DataSet<?> output = input

>>

>>                         .mapPartition(new

>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_MAPPER")

>>

>>                         .groupBy(0)

>>

>>                         .reduceGroup(new

>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat

>> a,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_REDUCER");

>>

>>

>>

>> MR_GPMRS_Mapper():

>>

>> public class MR_GPMRS_Mapper <T extends Tuple> extends

>> RichMapPartitionFunction<T,

>> Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,

>> BitSet, BitSet>>>

>>

>>

>>

>> MR_GPMRS_Reducer():

>>

>> public class MR_GPMRS_Reducer <T extends Tuple> extends

>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>

>> ,

>> BitSet, BitSet>>, T>

>>

>>

>>

>> The Tuple2 has as Payload on position f1 the Tuple3 and on position

>> f0 the Integer Key for grouping.

>>

>>

>>

>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :

>> - )

>>

>>

>>

>> Thank you in advance!

>>

>> Robert


Mime
View raw message