flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Greg Hogan <c...@greghogan.com>
Subject Re: Performance issues with GroupBy?
Date Tue, 26 Jul 2016 16:57:06 GMT
Hi Robert,

Are you able to simplify the your function input / output types? Flink
aggressively serializes the data stream and complex types such as ArrayList
and BitSet will be much slower to process. Are you able to reconstruct the
lists to be groupings on elements?

Greg

On Mon, Jul 25, 2016 at 8:06 AM, Paschek, Robert <
robert.paschek@tu-berlin.de> wrote:

> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in
> runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the
> algorithms in the “upper” group using a groupBy transformation and the
> algorithms in the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing
> and the reducers start _*iterating*_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with
> 26 cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet<?> output = input
>
>                         .mapPartition(*new*
> MR_GPMRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(
> parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_MAPPER")
>
>                         .groupBy(0)
>
>                         .reduceGroup(*new* MR_GPMRS_Reducer()).returns(
> input.getType()).withBroadcastSet(metaData, "MetaData").withParameters(
> parameters).name(MR_GPMRS_OPTIMIZED.*class*.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> *public* *class* MR_GPMRS_Mapper <T *extends* Tuple> *extends*
> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> *public* *class* MR_GPMRS_Reducer <T *extends* Tuple> *extends*
> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome
> : - )
>
>
>
> Thank you in advance!
>
> Robert
>

Mime
View raw message