flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: Custom Partitioning for Keyed Streams
Date Wed, 10 Jan 2018 08:54:11 GMT

I don’t think it is possible to enforce scheduling of two keys to different nodes, since
all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before keyBy, which
is followed by final aggregation after the keyBy) can be the solution for handling a data
skew. With pre aggregation, some (most?) of the work can be distributed and be done on the
source node instead of doing all of the heavy lifting on the destination node. It has not
been yet merged to the Flink code, but it’s entirely a user space code, which you could
copy paste (and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 <https://github.com/apache/flink/pull/4626>
Please pay attention at the limitations of this code (documented in the java doc).

If above code doesn’t work for you for whatever reason, you can also try to implement some
custom tailored pre aggregation. Like having two keyBy steps, where in first you can artificially
split A and B keys into couple of smaller ones and the second keyBy could merge/squash the


> On 9 Jan 2018, at 21:55, Martin, Nick <Nick.Martin@OrbitalATK.com> wrote:
> Have a set of stateful operators that rely on keyed state. There is substantial skew
between keys (i.e. there will be 100 messages on keys A and B, and 10 messages each on keys
C-J), and key selection assignment is dictated by the needs of my application such that I
can’t choose keys in a way that will eliminate the skew. The skew is somewhat predictable
(i.e. I know keys A and B will usually get roughly 10x as many messages as the rest) and fairly
consistent on different timescales (i.e. counting the messages on each key for 30 seconds
would provide a reasonably good guess as to the distribution of messages that will be received
over the next 10-20 minutes).
> The problem I’m having is that often the high volume keys (A and B in the example)
end up on the same task slot and slow it down, while the low volume ones are distributed across
the other operators, leaving them underloaded. I looked into the available physical partitioning
functions, but it looks like that functionality is generally incompatible with keyed streams,
and I need access to keyed state to do my actual processing. Is there any way I can get better
load balancing while using keyed state?
> Notice: This e-mail is intended solely for use of the individual or entity to which it
is addressed and may contain information that is proprietary, privileged and/or exempt from
disclosure under applicable law. If the reader is not the intended recipient or agent responsible
for delivering the message to the intended recipient, you are hereby notified that any dissemination,
distribution or copying of this communication is strictly prohibited. This communication may
also contain data subject to U.S. export laws. If so, data subject to the International Traffic
in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated
or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express
prior approval of the U.S. Department of State. Data subject to the Export Administration
Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department
of Commerce regulations. If you have received this communication in error, please notify the
sender by reply e-mail and destroy the e-mail message and any physical copies made of the
>  Thank you. 
> *********************

View raw message