flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Alexandrov <alexander.s.alexand...@gmail.com>
Subject Re: PartitionedByHash input annotation?
Date Fri, 22 May 2015 08:43:23 GMT
Yes, thanks for the clarification.

If I undertand correctly there is no guarantee that after partitionByHash
sender[i] will be scheduled on a slot after receiver[i], so we will have to
come up with some workaround or tweak the code.

Based on that I think that our options are:

1) extend the scheduler so that the above constraint can be somehow
2) extend the optimizer so that data source chains are already hash
partitioned can be annotated as such in a way which allows this
partitioning to be reused for a coGroup / join / reduce / groupReduce

I think that option (2) should be simpler as much of the logic is already
in place. Last time I checked the only problem was that the data source
partitioning metadata was incompatible with the operator partitioning


2015-05-21 21:50 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Alex,
> did my previous mail answer these questions as well?
> Cheers, Fabian
> 2015-05-18 22:03 GMT+02:00 Alexander Alexandrov <
> alexander.s.alexandrov@gmail.com>:
>> In the dawn of Flink when Flink Operators were still called PACTs (short
>> for Parallelization Contracts) the system used to support the so called
>> "output contracts" via custom annotations that can be attached to the UDF
>> (the ForwardedFields annotation is a descendant of that concept).
>> Amonst others, if I remember correctly there was an output contract
>> indicating that a DataSet is hash-partitioned by key, which was used in
>> order to avoid unnecessary re-partitioning of an input (e.g. for a
>> subsequent reducer, coGroup). I wonder what happened to that, as I can't
>> find it any more - I am looking here:
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
>> Help / suggestions how to realize the same functionality with the current
>> version of the API are appreciated.
>> As a fallback, I think that "partitionByHash" could maybe do the trick at
>> the expense of one pipelined pass over the data, but I am not sure whether
>> the receiver IDs are sheduled on the same machines as their sender
>> counterparts. In other words, can I assume that the following happens:
>> machine1:  (task[0])  partitionByHash  (task[0])
>> machine2 : (task[1])  partitionByHash  (task[1])
>> ...
>> machine2 : (task[n])  partitionByHash  (task[n])
>> Cheers,
>> Alexander

View raw message