flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Alexandrov <alexander.s.alexand...@gmail.com>
Subject Re: Informing the runtime about data already repartitioned using "output contracts"
Date Tue, 19 May 2015 11:49:35 GMT
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday.
Mustafa is working on a master thesis where we try to abstract an operator
for the update of stateful datasets (decoupled from the current native
iterations logic) and use it in conjunction with lazy unrolling of

The assumptions are as follows:

   - Each iteration runs a job with the same structure and the same DOP;
   - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
   which consumes a *(state, updates)* pair of datasets and produces a new
   version of the state (let's call it *state'*);
   - We keep track where the *N* output partitions of *state'* are located
   and use this information for local placement of the corresponding *N*
   DataSource tasks in the next iteration (via FLINK-1478);
   - The remaining piece of the puzzle is to figure out how to tell the
   coGroup that one of the inputs is already partitioned so id avoids an
   unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that
served a similar purpose avoid unnecessary shuffles), but I was not able to
find it yesterday.

In either case, I think even if that does not work out of the box at the
moment, that most of the logic is in place (e.g. co-location groups in the
scheduler), and we are willing to either hack the code or add the missing
functionality in order to realize the above described goal.

Suggestions are welcome!


2015-05-18 17:42 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Mustafa,
> I'm afraid, this is not possible.
> Although you can annotate DataSources with partitioning information, this
> is not enough to avoid repartitioning for a CoGroup. The reason for that is
> that CoGroup requires co-partitioning of both inputs, i.e., both inputs
> must be equally partitioned (same number of partitions, same partitioning
> function, same location of partitions). Since Flink is dynamically
> assigning tasks to execution slots, it is not possible to co-locate data
> that was read from a data source and data coming from the result of another
> computation.
> If you just need the result of the first co-group on disk, you could also
> build a single program that does both co-groups and additional writes the
> result of the first co-group to disk (Flink supports multiple data sinks).
> Best, Fabian
> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>:
>> Hi,
>> I am writing a flink job, in which I have three datasets.  I have
>> partitionedByHash the first two before coGrouping them.
>> My plan is to spill the result of coGrouping to disk, and then re-read it
>> again before coGrouping with the third dataset.
>> My question is, is there anyway to inform flink that the first coGroup
>> result is already partitioned ?!  I know I can re-partition again before
>> coGrouping but I would like to know if there is anyway to avoid a step
>> which was already executed,
>> Regards.
>> --
>> Mustafa Elbehery
>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>> +49(0)15750363097
>> skype: mustafaelbehery87

View raw message