flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Informing the runtime about data already repartitioned using "output contracts"
Date Tue, 21 Jul 2015 13:27:02 GMT
I think we are still talking about the same issue as in a related question.
I suspect that the MutableInputFormatTest does not properly return the
splits in the "createInputSplits()" function.

To validate that, you can write yourself a unit test that checks whether
the input format returns your splits from the method "createInputSplits()".

On Fri, May 29, 2015 at 5:59 PM, Mustafa Elbehery <elbeherymustafa@gmail.com
> wrote:

> Hi Folks,
>
> I am reviving this thread again, as I am stuck in one step to achieve my
> target.
>
> the following code is doing partitioning, before coGrouping, then writing
> to disk.  I am trying to re-read the data from disk, so I have create*LocatableInputSPlits
> [] *with the size of DOP. Find the code Below
>
> inPerson.partitionByHash("name")
>       .map(new TrackHost())
>       .coGroup(inStudent.partitionByHash("name"))
>       .where("name").equalTo("name")
>       .with(new ComputeStudiesProfile())
>       .write(new TextOutputFormat(new Path()), "file:///home/mustafa/Documents/tst/",
FileSystem.WriteMode.OVERWRITE);
>
> LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
> splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
> splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
> splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
> DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new
PersonMapper());
> secondIn.print();
>
>
>
> TrackHost is an Accumulator to track the host information, && MutuableInputFormat,
is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment
..
>
> I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting
with TextInputFormat, on the createInputSplit method, they both have the same method and the
compiler complained for that.
>
>
> Again, while debugging I could see the problem in *ExectionJobVertex line 146 . *the
execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again
which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared
by the scheduler waiting for a machine with "LocalHost".
>
> Any Suggestion ??
>
> Regards.
>
>
>
>
> On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Alright, so if both inputs of the CoGroup are read from the file system,
>> there should be a way to do the co-group on co-located data without
>> repartitioning.
>> In fact, I have some code lying around to do co-located joins from local
>> FS [1]. Haven't tested it thoroughly and it also relies on a number of
>> assumptions. If the data is also sorted you can even get around sorting it
>> if you inject a few lines into the optimizer (see change for FLINK-1444)
>> and ensure that each source reads exactly one! input split.
>>
>> Regarding your question about the PACT output contracts, there were three
>> types which were defined wrt to a Key/Value pair data model:
>> - Same key: UDF does not modify the key
>> - Super key: UDF extends the key (Partitioning remains valid, sorting not)
>> - Unique key: Keys from UDF or source are unique
>>
>> Let me know, if you have questions.
>> Cheers, Fabian
>>
>> [1] https://github.com/fhueske/flink-localjoin-utils
>>
>> 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
>> alexander.s.alexandrov@gmail.com>:
>>
>>> Thanks for the feedback, Fabian.
>>>
>>> This is related to the question I sent on the user mailing list
>>> yesterday. Mustafa is working on a master thesis where we try to abstract
>>> an operator for the update of stateful datasets (decoupled from the current
>>> native iterations logic) and use it in conjunction with lazy unrolling of
>>> iterations.
>>>
>>> The assumptions are as follows:
>>>
>>>    - Each iteration runs a job with the same structure and the same DOP;
>>>    - Updates a realized through a coGroup with a fixed DOP (let's say
>>>    *N*), which consumes a *(state, updates)* pair of datasets and
>>>    produces a new version of the state (let's call it *state'*);
>>>    - We keep track where the *N* output partitions of *state'* are
>>>    located and use this information for local placement of the corresponding
>>>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>>>    - The remaining piece of the puzzle is to figure out how to tell the
>>>    coGroup that one of the inputs is already partitioned so id avoids an
>>>    unnecessary shuffle;
>>>
>>> If I remember correctly back in the day we had a PACT output contract
>>> that served a similar purpose avoid unnecessary shuffles), but I was not
>>> able to find it yesterday.
>>>
>>> In either case, I think even if that does not work out of the box at the
>>> moment, that most of the logic is in place (e.g. co-location groups in the
>>> scheduler), and we are willing to either hack the code or add the missing
>>> functionality in order to realize the above described goal.
>>>
>>> Suggestions are welcome!
>>>
>>> Regards,
>>> Alex
>>>
>>>
>>>
>>>
>>> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>
>>>> Hi Mustafa,
>>>>
>>>> I'm afraid, this is not possible.
>>>> Although you can annotate DataSources with partitioning information,
>>>> this is not enough to avoid repartitioning for a CoGroup. The reason for
>>>> that is that CoGroup requires co-partitioning of both inputs, i.e., both
>>>> inputs must be equally partitioned (same number of partitions, same
>>>> partitioning function, same location of partitions). Since Flink is
>>>> dynamically assigning tasks to execution slots, it is not possible to
>>>> co-locate data that was read from a data source and data coming from the
>>>> result of another computation.
>>>>
>>>> If you just need the result of the first co-group on disk, you could
>>>> also build a single program that does both co-groups and additional writes
>>>> the result of the first co-group to disk (Flink supports multiple data
>>>> sinks).
>>>>
>>>> Best, Fabian
>>>>
>>>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a flink job, in which I have three datasets.  I have
>>>>> partitionedByHash the first two before coGrouping them.
>>>>>
>>>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>>>> it again before coGrouping with the third dataset.
>>>>>
>>>>> My question is, is there anyway to inform flink that the first coGroup
>>>>> result is already partitioned ?!  I know I can re-partition again before
>>>>> coGrouping but I would like to know if there is anyway to avoid a step
>>>>> which was already executed,
>>>>>
>>>>> Regards.
>>>>>
>>>>> --
>>>>> Mustafa Elbehery
>>>>> EIT ICT Labs Master School
>>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>>> +49(0)15750363097
>>>>> skype: mustafaelbehery87
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Mustafa Elbehery
> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
> +49(0)15750363097
> skype: mustafaelbehery87
>
>

Mime
View raw message