flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: A proposal about skew data handling in Flink
Date Tue, 20 Oct 2015 08:30:36 GMT
Yes, that sounds good to me.
Implement support for generic range partitioning first and go for the
non-range-equally-splittable cases later.

Best, Fabian

2015-10-20 5:21 GMT+02:00 Li, Chengxiang <chengxiang.li@intel.com>:

> Thanks a lot for the comments, Fabian. I agree with you on the plan
> mostly, just add some more thoughts about  Non-Range-Equally-Splittable
> case here.
> 1. Let's assume a case which 10% data is skewed on certain key, in this
> case, as long as the parallelism is larger than 10, it would fit into
> Non-Range-Equally-Splittable case. So it should not be very corner case of
> skew issue.
> 2. In proposal, the solution of Non-Range-Equally-Splittable case is based
> on 2 new RangePartitioner and little optimizer logic, which has been
> touched already in the plan #1, #2. It does not require to change anything
> about the operator semantics, so if we have a good partitioner abstraction,
> I think it does not add much complexity for Flink to handle this kind of
> issue.
> It should not block anything, after finished the simple case, we would
> have more knowledge about the implementation details, then we can look back
> at this issue, and decide whether it's deserved to be resolved at the cost.
> Thanks
> Chengxiang
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Monday, October 19, 2015 7:15 PM
> To: dev@flink.apache.org
> Subject: Re: A proposal about skew data handling in Flink
> Hi,
> First of all, thanks a lot for this extensive proposal! It contains a lot
> of good observations and techniques how to address data skew.
> I have a few remarks:
> 1) The terms Input and Output Contract were introduced in the first
> scientific publications and are not used anymore. Input Contract are what
> we call operators or transformations today, the concept of output contract
> is completely gone.
> In the current code, we have operators like Map, Reduce, and Join that
> describe how data needs to be organized (by key, etc.) and UDFs that
> process the data.
> 2) I would categorize skew as follows:
> - UDF Call Complexity Skew: The input cardinalities of UDF calls differ
> (only applicable to group-based operators such as GroupReduce and CoGroup)
> or the computational complexity of UDF calls depends on the data and varies
> a lot. UDF calls are the smallest parallelizable unit. It is not possible
> to change that without changing the semantics. Combiners can help to reduce
> the effect of skew for group-based operators.
> - Input Partition Skew: The cardinality of parallel partitions varies.
> This is handled by Flink as follows:
>     - Lazy split assignment for data sources
>     - Operators that do not require special partitioning (Map, Filter,
> Cross, etc.) just consume the output partitions of the preceding operator.
> Rebalance() can be used to enforce round-robin partitioning to equalize
> size of all partitions.
>     - Operators that require key-based partitioning use hash partitioning.
> Range partitioning can help address significant data skew.
> - UDF Call Skew: The number of UDF calls per parallel partition varies.
> This can be an issue for n-m joins which essentially result in Cartesian
> products.
>     - UDF Call Skew is most relevant for Joins
>     - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by
> controlling Input Partition Skew
> 3) I agree that we should not try to detect and automatically fix data
> skew (at the moment) but give users tools to manually manage skew.
> 4) I would focus on addressing the Input Partition Skew problem. UDF Call
> Complexity Skew cannot be addressed because it would change the semantics
> of operators. UDF Call Skew is only affecting joins and much harder to
> solve.
> 5) I wonder how much the practical gain is to address the
> Non-Range-Equally-Splittable case compared to the added code complexity. In
> general, tackling skew is a very good idea, but solving corner cases with
> quite complex methods might make future features more complicated to add.
> Hence, I would propose to focus on the common and "easy" cases first.
> I would address Input Partition Skew first and ignore the
> Non-Range-Equally-Splittable case for now. We can do this in two steps:
> 1) Add the "simple" range partitioner as in your pull request for unary
> operators (explicit range partitioning, total order, groupBy). Once the
> sampling happens online, this is a very good addition to Flink.
> 2) Add the "simple" range partitioner also for binary operators (join,
> coGroup). This will be a bit more tricky, because we need to do a
> coordinated decision for both inputs.
> 3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe
> through optimizer hints.
> Since we want to have this transparently handled by the API and engine, we
> need to add a lot of these features into the optimizer, or
> JobGraphGenerator to be more precisely.
> Does that make sense to you?
> Cheers, Fabian
> 2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
> > Hi,
> >
> > thanks for starting a discussion about data skew! I agree, it's a
> > important issue that can cause a lot of problems.
> > I'll have a look at your proposal and add comments soon.
> >
> > Thanks, Fabian
> >
> > 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <chengxiang.li@intel.com>:
> >
> >> Dear all,
> >> In many real world use case, data are nature to be skewed. For
> >> example, in social network, famous people get much more "follow" than
> >> others, a hot tweet would be transferred millions of times. and the
> >> purchased records of normal product can never compared to hot
> >> products. While at the same time, Flink runtime assume that all tasks
> >> consume same size resources, this's not always true. Skew data
> >> handling try to make skewed data fit into Flink's runtime.
> >> I write a proposal about skew data handling in Flink, you can read it
> >> at
> >> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIf
> >> eDYldvZsKI/edit?usp=sharing
> >> .
> >> Any comments and feedback are welcome, you can comment on the google
> >> doc, or reply this email thread directly.
> >>
> >> Thanks
> >> Chengxiang
> >>
> >
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message