flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Bunk <stefan.b...@googlemail.com>
Subject Re: Distribute DataSet to subset of nodes
Date Mon, 21 Sep 2015 19:15:11 GMT
Of course!

On 21 September 2015 at 19:10, Fabian Hueske <fhueske@gmail.com> wrote:

> The custom partitioner does not know its task id but the mapper that
> assigns the partition ids knows its subtaskid.
> So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7
> will be send over the network.
> On Sep 21, 2015 6:56 PM, "Stefan Bunk" <stefan.bunk@googlemail.com> wrote:
>> Hi Fabian,
>> that sounds good, thank you.
>> One final question: As I said earlier, this also distributes data in some
>> unnecessary cases, say ID 4 sends data to ID 3.
>> Is there no way to find out the ID of the current node? I guess that
>> number is already available on the node and just needs to be exposed
>> somehow, right?
>> Cheers
>> Stefan
>> On 17 September 2015 at 18:39, Fabian Hueske <fhueske@gmail.com> wrote:
>>> Hi Stefan,
>>> I think I have a solution for your problem :-)
>>> 1) Distribute both parts of the small data to each machine (you have
>>> done that)
>>> 2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
>>> (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
>>> the first half, tasks 5 to 9 read the second half.
>>> 3) Give the large input into a FlatMapper which sends out two records
>>> for each incoming record and assigns the first outgoing record a task ID in
>>> range 0 to 4 and the second outgoing record an ID in range 5 to 9.
>>> 4) Have a custom partitioner (DataSet.partitionCustom()) after the
>>> duplicating mapper, which partitions the records based on the assigned task
>>> Id before they go into the mapper with the other smaller data set. A record
>>> with assigned task ID 0 will be sent to the mapper task with subtask index
>>> 0.
>>> This setup is not very nice, but should work.
>>> Let me know, if you need more detail.
>>> Cheers, Fabian
>>> 2015-09-16 21:44 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>> Hi Fabian,
>>>> the local file problem would however not exist, if I just copy both
>>>> halves to all nodes, right?
>>>> Lets say I have a file `1st` and a file `2nd`, which I copy to all
>>>> nodes.
>>>> Now with your approach from above, I do:
>>>> // helper broadcast datasets to know on which half to operate
>>>> val data1stHalf = env.fromCollection("1st")
>>>> val data2ndHalf = env.fromCollection("2nd")
>>>> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
>>>> "fileName").setParallelism(5)
>>>> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
>>>> "fileName").setParallelism(5)
>>>> DataSet result = mapped1.union(mapped2)
>>>> Then, in my custom operator implementation of flatMap I check the
>>>> helper broadcast data to know which file to load:
>>>> override def open(params: Configuration): Unit = {
>>>> val fileName =
>>>> getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
>>>> // read the file from the local filesystem which I copied there earlier
>>>> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
>>>> }
>>>> override def flatMap(document: Input, out: Collector[Output]): Unit = {
>>>> // do sth. with this.data and the input
>>>> out.collect(this.data.process(input))
>>>> }
>>>> I think this should work, or do you see another problem here?
>>>> Which brings us to the other question:
>>>> The both halves are so large, that one half of the data fits in the
>>>> user-remaining memory on a node, but not both halves. So my program would
>>>> probably memory-crash, if the scheduling trusts one node so much, that it
>>>> wants to execute two flatMaps there ;-).
>>>> You are saying, that it is not guaranteed, that all 10 nodes are used,
>>>> but how likely is it, that one node is given two flatMaps and another one
>>>> is basically idling? I have no idea of the internals, but I guess there is
>>>> some heuristic inside which decides how to distribute.In the normal setup
>>>> that all 10 nodes are up, connection is good, all nodes have the same
>>>> resources available, input data is evenly distributed in HDFS, then the
>>>> default case should be to distribute to all 10 nodes, right?
>>>> I am not running in production, so for me it would be ok, if this works
>>>> out usually.
>>>> Cheers
>>>> Stefan
>>>> On 15 September 2015 at 23:40, Fabian Hueske <fhueske@gmail.com> wrote:
>>>>> Hi Stefan,
>>>>> the problem is that you cannot directly influence the scheduling of
>>>>> tasks to nodes to ensure that you can read the data that you put in the
>>>>> local filesystems of your nodes. HDFS gives a shared file system which
>>>>> means that each node can read data from anywhere in the cluster.
>>>>> I assumed the data is small enough to broadcast because you want to
>>>>> keep it in memory.
>>>>> Regarding your question. It is not guaranteed that two different
>>>>> tasks, each with parallelism 5, will be distributed to all 10 nodes (even
>>>>> if you have only 10 processing slots).
>>>>> What would work is to have one map task with parallelism 10 and a
>>>>> Flink setup with 10 task managers on 10 machines with only one processing
>>>>> slot per TM. However, you won't be able to replicate the data to both
>>>>> of maps because you cannot know which task instance will be executed
>>>>> which machine (you cannot distinguish the tasks of both task sets).
>>>>> As I said, reading from local file system in a cluster and forcing
>>>>> task scheduling to specific nodes is quite tricky.
>>>>> Cheers, Fabian
>>>>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>>>> Hi Fabian,
>>>>>> I think we might have a misunderstanding here. I have already copied
>>>>>> the first file to five nodes, and the second file to five other nodes,
>>>>>> outside of Flink. In the open() method of the operator, I just read
>>>>>> file via normal Java means. I do not see, why this is tricky or how
>>>>>> should help here.
>>>>>> Then, I have a normal Flink DataSet, which I want to run through
>>>>>> operator (using the previously read data in the flatMap implementation).
>>>>>> I run the program several times, I do not want to broadcast the data
>>>>>> time, but rather just copy it on the nodes, and be fine with it.
>>>>>> Can you answer my question from above? If the setParallelism-method
>>>>>> works and selects five nodes for the first flatMap and five _other_
>>>>>> for the second flatMap, then that would be fine for me if there is
no other
>>>>>> easy solution.
>>>>>> Thanks for your help!
>>>>>> Best
>>>>>> Stefan
>>>>>> On 14 September 2015 at 22:28, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>> Hi Stefan,
>>>>>>> forcing the scheduling of tasks to certain nodes and reading
>>>>>>> from the local file system in a multi-node setup is actually
quite tricky
>>>>>>> and requires a bit understanding of the internals.
>>>>>>> It is possible and I can help you with that, but would recommend
>>>>>>> use a shared filesystem such as HDFS if that is possible.
>>>>>>> Best, Fabian
>>>>>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>>>>>> Hi,
>>>>>>>> actually, I am distributing my data before the program starts,
>>>>>>>> without using broadcast sets.
>>>>>>>> However, the approach should still work, under one condition:
>>>>>>>>> DataSet mapped1 =
>>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
>>>>>>>>> DataSet mapped2 =
>>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
>>>>>>>> Is it guaranteed, that this selects a disjoint set of nodes,
>>>>>>>> five nodes for mapped1 and five other nodes for mapped2?
>>>>>>>> Is there any way of selecting the five nodes concretely?
>>>>>>>> I have stored the first half of the data on nodes 1-5 and
the second half
>>>>>>>> on nodes 6-10. With this approach, I guess, nodes are selected
randomly so
>>>>>>>> I would have to copy both halves to all of the nodes.
>>>>>>>> Best,
>>>>>>>> Stefan

View raw message