Yes, thanks for the clarification.

If I undertand correctly there is no guarantee that after partitionByHash sender[i] will be scheduled on a slot after receiver[i], so we will have to come up with some workaround or tweak the code.

Based on that I think that our options are:

1) extend the scheduler so that the above constraint can be somehow guaranteed.
2) extend the optimizer so that data source chains are already hash partitioned can be annotated as such in a way which allows this partitioning to be reused for a coGroup / join / reduce / groupReduce operator.

I think that option (2) should be simpler as much of the logic is already in place. Last time I checked the only problem was that the data source partitioning metadata was incompatible with the operator partitioning requirements.


2015-05-21 21:50 GMT+02:00 Fabian Hueske <>:
Hi Alex,

did my previous mail answer these questions as well?

Cheers, Fabian

2015-05-18 22:03 GMT+02:00 Alexander Alexandrov <>:
In the dawn of Flink when Flink Operators were still called PACTs (short for Parallelization Contracts) the system used to support the so called "output contracts" via custom annotations that can be attached to the UDF (the ForwardedFields annotation is a descendant of that concept).

Amonst others, if I remember correctly there was an output contract indicating that a DataSet is hash-partitioned by key, which was used in order to avoid unnecessary re-partitioning of an input (e.g. for a subsequent reducer, coGroup). I wonder what happened to that, as I can't find it any more - I am looking here:

Help / suggestions how to realize the same functionality with the current version of the API are appreciated.

As a fallback, I think that "partitionByHash" could maybe do the trick at the expense of one pipelined pass over the data, but I am not sure whether the receiver IDs are sheduled on the same machines as their sender counterparts. In other words, can I assume that the following happens:

machine1:  (task[0])  partitionByHash  (task[0])
machine2 : (task[1])  partitionByHash  (task[1])
machine2 : (task[n])  partitionByHash  (task[n])