Hi Elias,
sorry for the delay, this must have fallen under the table after Flink Forward.

I did spend some time thinking about this and we had the idea for a while now to add an operation like “keyByWithoutPartitioning()” (name not final ;-) that would allow the user to tell the system that we don’t have to do a reshuffle. This would work if the key-type (and keys) would stay exactly the same.

I think it wouldn’t work for your case because the key type changes and elements for key (A, B) would normally be reshuffled to different instances than with key (A), i.e. (1, 1) does not belong to the same key-group as (1). Would you agree that this happens in your case?


On 25. Apr 2017, at 23:32, Elias Levy <fearsome.lucidity@gmail.com> wrote:


On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <fearsome.lucidity@gmail.com> wrote:
This is something that has come up before on the list, but in a different context.  I have a need to rekey a stream but would prefer the stream to not be repartitioned.  There is no gain to repartitioning, as the new partition key is a composite of the stream key, going from a key of A to a key of (A, B), so all values for the resulting streams are already being rerouted to the same node and repartitioning them to other nodes would simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate operations.  Instead I am executing CEP patterns.  Some patterns apply the the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation. keyBy() will repartition and there is isn't something like subKey() to subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the second element in the key, and delegating to the default partitioner passing it only the first element, thus resulting in no change of task assignment.