flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ventura Del Monte <venturadelmo...@gmail.com>
Subject Re: repartion locally to task manager
Date Mon, 08 Jun 2015 14:49:47 GMT
Hi Stephan,

Many thank for your reply!

1) This would be a nice feature. I have already done something similar, if
you told me which informations you would like to export in the runtime
context, I could add them to my code, update unit tests and share them.

2) Yes, I have figured that out. However, I needed this kind of local
repartition since I was working on a dataset sampler based on the filter
operator (this is the first step of the iterative pipeline I am
developing). To be honest, this repartition is just a plus because I have
already achieved good results (even if a sampler like the one offered by
spark when the ratio is low would be a good feature). The main drawback of
this filter operation is that it takes in input always the same partition,
so, if the partition is enough big, then the probability of sampling
different items in consecutive filtering operations should be high (of
course, using a good sampling factor and a correctly seeded rng). Yet if it
was possible to shuffle the partitions on the same task manager, the
following sampling operation would benefit, in my opinion, as the produced
partition would contain different items with an even higher probability. Of
course, I think this shuffle operation (being local to each tm) should not
involve neither a network nor a disk transfer, otherwise, the game is not
worth the candle.
About the change of parallelism, I read that it triggers a sort of local
re-distribution, but I do no think it is my case. Anyway, do you think this
kind of shuffling/sampling can be achieved in flink? Does it make sense in
your opinion?


Best Regards,
Ventura

2015-06-03 14:57 GMT+02:00 Stephan Ewen <sewen@apache.org>:

> Hi Ventura!
>
> Sorry for the late response. Here are a few ideas or comments that may
> help you:
>
> 1) We want to make it possible for a function (such as MapFunction) to
> figure out on which TaskManager it is running. The mechanism would be
> something like "getRuntimeContext().getTaskManagerInformation()". That
> should help you determine which TaskManager you are.
>
> 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2,
> ... are on the same TaskManager. The assignment is a based on locality of
> the input data stream and the availability of slots.
>
>
> Can you explain a bit more what the feature you want to add actually tries
> to achieve? Then I may be able to give you more pointers.
>
> When you say that you need local re-distribution, does it imply something
> like below, where a change of parallelism between operators implies that
> the only locally repartition (not across the boundaries of TaskManagers)?
>
>
>  (map) (map)  (map) (map)
>    \     /      \    /
>     \   /        \  /
>    (reduce)    (reduce)
>       ^ ^        ^ ^
>       | \        / |
>       |  +------+  |
>       | /        \ |
>    (source)     (source)
>
>
>
> Greetings,
> Stephan
>
>
>
> On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
> venturadelmonte@gmail.com> wrote:
>
>> Hello,
>>
>> I am trying to introduce a new feature in my flink project, I would like
>> to shuffle (random repartition) my dataset only locally to a task manager,
>> so that each internal worker will have a different set of objects to work
>> on. I have looked to internal flink mechanism, and I know (i hope) how it
>> handles partitions. I think there are two ways to do it:
>>
>> a) using a mapPartiton, which for each input object X should output a
>> tuple (X, destinationChannel), where the destinationChannel is the id of
>> the new worker that will receive X. The main problem of this solution is to
>> determine the correct destinationChannel in the mapPartition task. I think
>> every operation in flink is unaware of the task manager on which it is
>> executed, so I will need to read taskmanager config in order to get the
>> number of slots available on the current TM, but then how should I relate
>> this number to the total channels count, since I could have a situation
>> like this:
>>
>> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
>> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
>> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
>> +----+----+----+---------+----+----+----+----+--------------------+
>> |                   |                            |                |
>> |      TM1          |            TM2             |       TM3      |
>> +-------------------+----------------------------+----------------+
>>
>> So even if I knew TM2 had 6 slots, i would not be able to know their id
>> range -> [4,9]
>>
>> b) Destination channels are choosen in
>> RegularPactTask.getOutputCollector, so some modifications of this method
>> would make the local repartition possible using either a range or a
>> custom partition, in order to make them taskmanager-aware. Yet this will
>> involve some edits to flink runtime.
>>
>> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
>> will have to edit it.
>>
>> Do you have better suggestions? Thank you in advance.
>>
>
>

Mime
View raw message