flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: repartion locally to task manager
Date Mon, 22 Jun 2015 10:46:11 GMT
Hi Ventura!

Concerning (1) :

What would be good is to make the
"org.apache.flink.runtime.instance.InstanceConnectionInfo" in the
getruntimeContext()'s RuntimeContext object. In order to do that, we could
need to move that into the flink-core package. We could also rename it
simply to "ConnectionInfo"


Concerning (2) :

I think this may be a bit harder to add. I am curious what your results are
without this optimization.


Stephan


On Mon, Jun 8, 2015 at 4:49 PM, Ventura Del Monte <venturadelmonte@gmail.com
> wrote:

> Hi Stephan,
>
> Many thank for your reply!
>
> 1) This would be a nice feature. I have already done something similar, if
> you told me which informations you would like to export in the runtime
> context, I could add them to my code, update unit tests and share them.
>
> 2) Yes, I have figured that out. However, I needed this kind of local
> repartition since I was working on a dataset sampler based on the filter
> operator (this is the first step of the iterative pipeline I am
> developing). To be honest, this repartition is just a plus because I have
> already achieved good results (even if a sampler like the one offered by
> spark when the ratio is low would be a good feature). The main drawback of
> this filter operation is that it takes in input always the same partition,
> so, if the partition is enough big, then the probability of sampling
> different items in consecutive filtering operations should be high (of
> course, using a good sampling factor and a correctly seeded rng). Yet if it
> was possible to shuffle the partitions on the same task manager, the
> following sampling operation would benefit, in my opinion, as the produced
> partition would contain different items with an even higher probability. Of
> course, I think this shuffle operation (being local to each tm) should not
> involve neither a network nor a disk transfer, otherwise, the game is not
> worth the candle.
> About the change of parallelism, I read that it triggers a sort of local
> re-distribution, but I do no think it is my case. Anyway, do you think this
> kind of shuffling/sampling can be achieved in flink? Does it make sense in
> your opinion?
>
>
> Best Regards,
> Ventura
>
> 2015-06-03 14:57 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
>> Hi Ventura!
>>
>> Sorry for the late response. Here are a few ideas or comments that may
>> help you:
>>
>> 1) We want to make it possible for a function (such as MapFunction) to
>> figure out on which TaskManager it is running. The mechanism would be
>> something like "getRuntimeContext().getTaskManagerInformation()". That
>> should help you determine which TaskManager you are.
>>
>> 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1,
>> 2, ... are on the same TaskManager. The assignment is a based on locality
>> of the input data stream and the availability of slots.
>>
>>
>> Can you explain a bit more what the feature you want to add actually
>> tries to achieve? Then I may be able to give you more pointers.
>>
>> When you say that you need local re-distribution, does it imply something
>> like below, where a change of parallelism between operators implies that
>> the only locally repartition (not across the boundaries of TaskManagers)?
>>
>>
>>  (map) (map)  (map) (map)
>>    \     /      \    /
>>     \   /        \  /
>>    (reduce)    (reduce)
>>       ^ ^        ^ ^
>>       | \        / |
>>       |  +------+  |
>>       | /        \ |
>>    (source)     (source)
>>
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
>> venturadelmonte@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am trying to introduce a new feature in my flink project, I would like
>>> to shuffle (random repartition) my dataset only locally to a task manager,
>>> so that each internal worker will have a different set of objects to work
>>> on. I have looked to internal flink mechanism, and I know (i hope) how it
>>> handles partitions. I think there are two ways to do it:
>>>
>>> a) using a mapPartiton, which for each input object X should output a
>>> tuple (X, destinationChannel), where the destinationChannel is the id of
>>> the new worker that will receive X. The main problem of this solution is to
>>> determine the correct destinationChannel in the mapPartition task. I think
>>> every operation in flink is unaware of the task manager on which it is
>>> executed, so I will need to read taskmanager config in order to get the
>>> number of slots available on the current TM, but then how should I relate
>>> this number to the total channels count, since I could have a situation
>>> like this:
>>>
>>> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
>>> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
>>> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
>>> +----+----+----+---------+----+----+----+----+--------------------+
>>> |                   |                            |                |
>>> |      TM1          |            TM2             |       TM3      |
>>> +-------------------+----------------------------+----------------+
>>>
>>> So even if I knew TM2 had 6 slots, i would not be able to know their id
>>> range -> [4,9]
>>>
>>> b) Destination channels are choosen in
>>> RegularPactTask.getOutputCollector, so some modifications of this method
>>> would make the local repartition possible using either a range or a
>>> custom partition, in order to make them taskmanager-aware. Yet this will
>>> involve some edits to flink runtime.
>>>
>>> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
>>> will have to edit it.
>>>
>>> Do you have better suggestions? Thank you in advance.
>>>
>>
>>
>

Mime
View raw message