flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: load balancing groups
Date Fri, 31 Oct 2014 21:20:44 GMT
Hmm, just found that there is no JoinHint that would allow what I described

Broadcasting one input and using the other one to build a hash-tables is
usually not a good thing to do, because the broadcasted side should be much
smaller than the other one...

2014-10-31 21:56 GMT+01:00 Fabian Hueske <fhueske@apache.org>:

> Just had another idea.
> The group-wise crossing that you are doing is actually a self-join on the
> grouping key.
> The system has currently no special strategy to deal with selfjoins. That
> means both inputs of the join (which are identical) are treated as two
> individual inputs. If you force a broadcast of the one side and build a
> hash partition on the other side, the following would happen:
> The broadcasted input would be replicate and sent to each individual
> worker thread. The other input would remain local and be still partitioned
> and therefore smaller on each node. That's why you would build the
> hash-table from the partitioned input. The larger, replicated input would
> be streamed along the hash tables. Because the inputs are not partitioned
> on the key, there should be no loadbalancing issues (depending on the
> previous partitioning, it can be even perfectly balanced...)
> However, this might not work (well) if the input is too large to be
> replicated a lot (or the smaller partitions are too large for in-memory
> hash-tables).
> Best, Fabian
> 2014-10-30 17:56 GMT+01:00 Fabian Hueske <fhueske@apache.org>:
>> Hi Martin,
>> Flink does not have features to mitigate data skew at the moment, such as
>> dynamic partitioning.
>> That would also "only" allow to process large groups as an individual
>> partitions and multiple smaller groups together in other partitions.
>> The issue of having a large group would not be solved with that. This is
>> more on the application-level right now and could for example be solved by
>> adding something like a group-cross operator...
>> I think your approach of emitting multiple smaller partitions from a
>> group-reduce, reshuffle (there is a rebalance operator [1]), and apply a
>> flatmap sounds like a good idea to me.
>> At least, I didn't come up with a better approach ;-)
>> Cheers, Fabian
>> [1]
>> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations
>> 2014-10-28 20:53 GMT+01:00 Martin Neumann <mneumann@spotify.com>:
>>> I have some problem with load balancing and was wondering how to deal
>>> with
>>> this kind of problem in Flink.
>>> The input I have is a data set of grouped ID's that I join with metadata
>>> for each ID. Then I need to compare each Item in a group with each other
>>> item in that group and if necessary splitting it into different
>>> subgroups.
>>> In flink its a join followed by a group reduce.
>>> The problem is that the groups differ a lot in size. 90% of the groups
>>> are
>>> done in 5 minutes while the rest takes 2 hours. In order to get this more
>>> efficient I would need to distribute the N to N comparison that currently
>>> is done in the group reduce function. Anyone has an idea how I can do
>>> that
>>> in a simple way?
>>> My current Idea is to make the group reduce step emit computation
>>> partitions and then do another flat-map step to do the actual
>>> computation.
>>> Would this solve the problem?
>>> cheers Martin

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message