flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mustafa Elbehery <elbeherymust...@gmail.com>
Subject Re: Informing the runtime about data already repartitioned using "output contracts"
Date Fri, 29 May 2015 15:59:35 GMT
Hi Folks,

I am reviving this thread again, as I am stuck in one step to achieve my
target.

the following code is doing partitioning, before coGrouping, then writing
to disk.  I am trying to re-read the data from disk, so I have
create*LocatableInputSPlits
[] *with the size of DOP. Find the code Below

inPerson.partitionByHash("name")
      .map(new TrackHost())
      .coGroup(inStudent.partitionByHash("name"))
      .where("name").equalTo("name")
      .with(new ComputeStudiesProfile())
      .write(new TextOutputFormat(new Path()),
"file:///home/mustafa/Documents/tst/",
FileSystem.WriteMode.OVERWRITE);

LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
DataSet<Person> secondIn = env.createInput(new
MutableInputFormatTest(new
Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new
PersonMapper());
secondIn.print();



TrackHost is an Accumulator to track the host information, &&
MutuableInputFormat, is an customInputFormat which extends
TextInputFormat && implements StrictlyLocalAssignment ..

I am using LocatableInputSplit as a instanceField, as implementing
InputSplit is conflicting with TextInputFormat, on the
createInputSplit method, they both have the same method and the
compiler complained for that.


Again, while debugging I could see the problem in *ExectionJobVertex
line 146 . *the execution ignores the Locatables I am shipping with my
splits, and re-create inputSplits again which get the hostInfo(Machine
Name) from the execution somehow, while the taskManagers prepared by
the scheduler waiting for a machine with "LocalHost".

Any Suggestion ??

Regards.




On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Alright, so if both inputs of the CoGroup are read from the file system,
> there should be a way to do the co-group on co-located data without
> repartitioning.
> In fact, I have some code lying around to do co-located joins from local
> FS [1]. Haven't tested it thoroughly and it also relies on a number of
> assumptions. If the data is also sorted you can even get around sorting it
> if you inject a few lines into the optimizer (see change for FLINK-1444)
> and ensure that each source reads exactly one! input split.
>
> Regarding your question about the PACT output contracts, there were three
> types which were defined wrt to a Key/Value pair data model:
> - Same key: UDF does not modify the key
> - Super key: UDF extends the key (Partitioning remains valid, sorting not)
> - Unique key: Keys from UDF or source are unique
>
> Let me know, if you have questions.
> Cheers, Fabian
>
> [1] https://github.com/fhueske/flink-localjoin-utils
>
> 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
> alexander.s.alexandrov@gmail.com>:
>
>> Thanks for the feedback, Fabian.
>>
>> This is related to the question I sent on the user mailing list
>> yesterday. Mustafa is working on a master thesis where we try to abstract
>> an operator for the update of stateful datasets (decoupled from the current
>> native iterations logic) and use it in conjunction with lazy unrolling of
>> iterations.
>>
>> The assumptions are as follows:
>>
>>    - Each iteration runs a job with the same structure and the same DOP;
>>    - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
>>    which consumes a *(state, updates)* pair of datasets and produces a
>>    new version of the state (let's call it *state'*);
>>    - We keep track where the *N* output partitions of *state'* are
>>    located and use this information for local placement of the corresponding
>>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>>    - The remaining piece of the puzzle is to figure out how to tell the
>>    coGroup that one of the inputs is already partitioned so id avoids an
>>    unnecessary shuffle;
>>
>> If I remember correctly back in the day we had a PACT output contract
>> that served a similar purpose avoid unnecessary shuffles), but I was not
>> able to find it yesterday.
>>
>> In either case, I think even if that does not work out of the box at the
>> moment, that most of the logic is in place (e.g. co-location groups in the
>> scheduler), and we are willing to either hack the code or add the missing
>> functionality in order to realize the above described goal.
>>
>> Suggestions are welcome!
>>
>> Regards,
>> Alex
>>
>>
>>
>>
>> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>
>>> Hi Mustafa,
>>>
>>> I'm afraid, this is not possible.
>>> Although you can annotate DataSources with partitioning information,
>>> this is not enough to avoid repartitioning for a CoGroup. The reason for
>>> that is that CoGroup requires co-partitioning of both inputs, i.e., both
>>> inputs must be equally partitioned (same number of partitions, same
>>> partitioning function, same location of partitions). Since Flink is
>>> dynamically assigning tasks to execution slots, it is not possible to
>>> co-locate data that was read from a data source and data coming from the
>>> result of another computation.
>>>
>>> If you just need the result of the first co-group on disk, you could
>>> also build a single program that does both co-groups and additional writes
>>> the result of the first co-group to disk (Flink supports multiple data
>>> sinks).
>>>
>>> Best, Fabian
>>>
>>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I am writing a flink job, in which I have three datasets.  I have
>>>> partitionedByHash the first two before coGrouping them.
>>>>
>>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>>> it again before coGrouping with the third dataset.
>>>>
>>>> My question is, is there anyway to inform flink that the first coGroup
>>>> result is already partitioned ?!  I know I can re-partition again before
>>>> coGrouping but I would like to know if there is anyway to avoid a step
>>>> which was already executed,
>>>>
>>>> Regards.
>>>>
>>>> --
>>>> Mustafa Elbehery
>>>> EIT ICT Labs Master School
>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>> +49(0)15750363097
>>>> skype: mustafaelbehery87
>>>>
>>>>
>>>
>>
>


-- 
Mustafa Elbehery
EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
+49(0)15750363097
skype: mustafaelbehery87

Mime
View raw message