Yes, thanks for the clarification.
If I undertand correctly there is no guarantee that after partitionByHash sender[i] will be scheduled on a slot after receiver[i], so we will have to come up with some workaround or tweak the code.
Based on that I think that our options are:
1) extend the scheduler so that the above constraint can be somehow guaranteed.
2) extend the optimizer so that data source chains are already hash partitioned can be annotated as such in a way which allows this partitioning to be reused for a coGroup / join / reduce / groupReduce operator.
I think that option (2) should be simpler as much of the logic is already in place. Last time I checked the only problem was that the data source partitioning metadata was incompatible with the operator partitioning requirements.