flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Bunk <stefan.b...@googlemail.com>
Subject Re: Distribute DataSet to subset of nodes
Date Mon, 21 Sep 2015 19:15:11 GMT
Of course!

On 21 September 2015 at 19:10, Fabian Hueske <fhueske@gmail.com> wrote:

> The custom partitioner does not know its task id but the mapper that
> assigns the partition ids knows its subtaskid.
>
> So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7
> will be send over the network.
> On Sep 21, 2015 6:56 PM, "Stefan Bunk" <stefan.bunk@googlemail.com> wrote:
>
>> Hi Fabian,
>>
>> that sounds good, thank you.
>>
>> One final question: As I said earlier, this also distributes data in some
>> unnecessary cases, say ID 4 sends data to ID 3.
>> Is there no way to find out the ID of the current node? I guess that
>> number is already available on the node and just needs to be exposed
>> somehow, right?
>>
>> Cheers
>> Stefan
>>
>>
>>
>> On 17 September 2015 at 18:39, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>> I think I have a solution for your problem :-)
>>>
>>> 1) Distribute both parts of the small data to each machine (you have
>>> done that)
>>> 2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
>>> (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
>>> the first half, tasks 5 to 9 read the second half.
>>> 3) Give the large input into a FlatMapper which sends out two records
>>> for each incoming record and assigns the first outgoing record a task ID in
>>> range 0 to 4 and the second outgoing record an ID in range 5 to 9.
>>> 4) Have a custom partitioner (DataSet.partitionCustom()) after the
>>> duplicating mapper, which partitions the records based on the assigned task
>>> Id before they go into the mapper with the other smaller data set. A record
>>> with assigned task ID 0 will be sent to the mapper task with subtask index
>>> 0.
>>>
>>> This setup is not very nice, but should work.
>>>
>>> Let me know, if you need more detail.
>>>
>>> Cheers, Fabian
>>>
>>> 2015-09-16 21:44 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> the local file problem would however not exist, if I just copy both
>>>> halves to all nodes, right?
>>>>
>>>> Lets say I have a file `1st` and a file `2nd`, which I copy to all
>>>> nodes.
>>>> Now with your approach from above, I do:
>>>>
>>>> // helper broadcast datasets to know on which half to operate
>>>> val data1stHalf = env.fromCollection("1st")
>>>> val data2ndHalf = env.fromCollection("2nd")
>>>>
>>>> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
>>>> "fileName").setParallelism(5)
>>>> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
>>>> "fileName").setParallelism(5)
>>>> DataSet result = mapped1.union(mapped2)
>>>>
>>>> Then, in my custom operator implementation of flatMap I check the
>>>> helper broadcast data to know which file to load:
>>>> override def open(params: Configuration): Unit = {
>>>> val fileName =
>>>> getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
>>>> // read the file from the local filesystem which I copied there earlier
>>>> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
>>>> }
>>>> override def flatMap(document: Input, out: Collector[Output]): Unit = {
>>>> // do sth. with this.data and the input
>>>> out.collect(this.data.process(input))
>>>> }
>>>>
>>>> I think this should work, or do you see another problem here?
>>>>
>>>> Which brings us to the other question:
>>>> The both halves are so large, that one half of the data fits in the
>>>> user-remaining memory on a node, but not both halves. So my program would
>>>> probably memory-crash, if the scheduling trusts one node so much, that it
>>>> wants to execute two flatMaps there ;-).
>>>>
>>>> You are saying, that it is not guaranteed, that all 10 nodes are used,
>>>> but how likely is it, that one node is given two flatMaps and another one
>>>> is basically idling? I have no idea of the internals, but I guess there is
>>>> some heuristic inside which decides how to distribute.In the normal setup
>>>> that all 10 nodes are up, connection is good, all nodes have the same
>>>> resources available, input data is evenly distributed in HDFS, then the
>>>> default case should be to distribute to all 10 nodes, right?
>>>>
>>>> I am not running in production, so for me it would be ok, if this works
>>>> out usually.
>>>>
>>>> Cheers
>>>> Stefan
>>>>
>>>>
>>>> On 15 September 2015 at 23:40, Fabian Hueske <fhueske@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> the problem is that you cannot directly influence the scheduling of
>>>>> tasks to nodes to ensure that you can read the data that you put in the
>>>>> local filesystems of your nodes. HDFS gives a shared file system which
>>>>> means that each node can read data from anywhere in the cluster.
>>>>> I assumed the data is small enough to broadcast because you want to
>>>>> keep it in memory.
>>>>>
>>>>> Regarding your question. It is not guaranteed that two different
>>>>> tasks, each with parallelism 5, will be distributed to all 10 nodes (even
>>>>> if you have only 10 processing slots).
>>>>> What would work is to have one map task with parallelism 10 and a
>>>>> Flink setup with 10 task managers on 10 machines with only one processing
>>>>> slot per TM. However, you won't be able to replicate the data to both
sets
>>>>> of maps because you cannot know which task instance will be executed
on
>>>>> which machine (you cannot distinguish the tasks of both task sets).
>>>>>
>>>>> As I said, reading from local file system in a cluster and forcing
>>>>> task scheduling to specific nodes is quite tricky.
>>>>> Cheers, Fabian
>>>>>
>>>>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> I think we might have a misunderstanding here. I have already copied
>>>>>> the first file to five nodes, and the second file to five other nodes,
>>>>>> outside of Flink. In the open() method of the operator, I just read
that
>>>>>> file via normal Java means. I do not see, why this is tricky or how
HDFS
>>>>>> should help here.
>>>>>> Then, I have a normal Flink DataSet, which I want to run through
the
>>>>>> operator (using the previously read data in the flatMap implementation).
As
>>>>>> I run the program several times, I do not want to broadcast the data
every
>>>>>> time, but rather just copy it on the nodes, and be fine with it.
>>>>>>
>>>>>> Can you answer my question from above? If the setParallelism-method
>>>>>> works and selects five nodes for the first flatMap and five _other_
nodes
>>>>>> for the second flatMap, then that would be fine for me if there is
no other
>>>>>> easy solution.
>>>>>>
>>>>>> Thanks for your help!
>>>>>> Best
>>>>>> Stefan
>>>>>>
>>>>>>
>>>>>> On 14 September 2015 at 22:28, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> forcing the scheduling of tasks to certain nodes and reading
files
>>>>>>> from the local file system in a multi-node setup is actually
quite tricky
>>>>>>> and requires a bit understanding of the internals.
>>>>>>> It is possible and I can help you with that, but would recommend
to
>>>>>>> use a shared filesystem such as HDFS if that is possible.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> actually, I am distributing my data before the program starts,
>>>>>>>> without using broadcast sets.
>>>>>>>>
>>>>>>>> However, the approach should still work, under one condition:
>>>>>>>>
>>>>>>>>> DataSet mapped1 =
>>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
>>>>>>>>> DataSet mapped2 =
>>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
>>>>>>>>>
>>>>>>>> Is it guaranteed, that this selects a disjoint set of nodes,
i.e.
>>>>>>>> five nodes for mapped1 and five other nodes for mapped2?
>>>>>>>>
>>>>>>>> Is there any way of selecting the five nodes concretely?
Currently,
>>>>>>>> I have stored the first half of the data on nodes 1-5 and
the second half
>>>>>>>> on nodes 6-10. With this approach, I guess, nodes are selected
randomly so
>>>>>>>> I would have to copy both halves to all of the nodes.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message