From Fabian Hueske <fhue...@gmail.com>
Subject Re: Distribute DataSet to subset of nodes
Date Thu, 17 Sep 2015 16:39:35 GMT
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
(get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for
each incoming record and assigns the first outgoing record a task ID in
range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the
duplicating mapper, which partitions the records based on the assigned task
Id before they go into the mapper with the other smaller data set. A record
with assigned task ID 0 will be sent to the mapper task with subtask index

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:

> Hi Fabian,
> the local file problem would however not exist, if I just copy both halves
> to all nodes, right?
> Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
> Now with your approach from above, I do:
> // helper broadcast datasets to know on which half to operate
> val data1stHalf = env.fromCollection("1st")
> val data2ndHalf = env.fromCollection("2nd")
> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
> "fileName").setParallelism(5)
> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
> "fileName").setParallelism(5)
> DataSet result = mapped1.union(mapped2)
> Then, in my custom operator implementation of flatMap I check the helper
> broadcast data to know which file to load:
> override def open(params: Configuration): Unit = {
> val fileName =
> getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
> // read the file from the local filesystem which I copied there earlier
> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
> }
> override def flatMap(document: Input, out: Collector[Output]): Unit = {
> // do sth. with this.data and the input
> out.collect(this.data.process(input))
> }
> I think this should work, or do you see another problem here?
> Which brings us to the other question:
> The both halves are so large, that one half of the data fits in the
> user-remaining memory on a node, but not both halves. So my program would
> probably memory-crash, if the scheduling trusts one node so much, that it
> wants to execute two flatMaps there ;-).
> You are saying, that it is not guaranteed, that all 10 nodes are used, but
> how likely is it, that one node is given two flatMaps and another one is
> basically idling? I have no idea of the internals, but I guess there is
> some heuristic inside which decides how to distribute.In the normal setup
> that all 10 nodes are up, connection is good, all nodes have the same
> resources available, input data is evenly distributed in HDFS, then the
> default case should be to distribute to all 10 nodes, right?
> I am not running in production, so for me it would be ok, if this works
> out usually.
> Cheers
> Stefan
> On 15 September 2015 at 23:40, Fabian Hueske <fhueske@gmail.com> wrote:
>> Hi Stefan,
>> the problem is that you cannot directly influence the scheduling of tasks
>> to nodes to ensure that you can read the data that you put in the local
>> filesystems of your nodes. HDFS gives a shared file system which means that
>> each node can read data from anywhere in the cluster.
>> I assumed the data is small enough to broadcast because you want to keep
>> it in memory.
>> Regarding your question. It is not guaranteed that two different tasks,
>> each with parallelism 5, will be distributed to all 10 nodes (even if you
>> have only 10 processing slots).
>> What would work is to have one map task with parallelism 10 and a Flink
>> setup with 10 task managers on 10 machines with only one processing slot
>> per TM. However, you won't be able to replicate the data to both sets of
>> maps because you cannot know which task instance will be executed on which
>> machine (you cannot distinguish the tasks of both task sets).
>> As I said, reading from local file system in a cluster and forcing task
>> scheduling to specific nodes is quite tricky.
>> Cheers, Fabian
>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>> Hi Fabian,
>>> I think we might have a misunderstanding here. I have already copied the
>>> first file to five nodes, and the second file to five other nodes, outside
>>> of Flink. In the open() method of the operator, I just read that file via
>>> normal Java means. I do not see, why this is tricky or how HDFS should help
>>> here.
>>> Then, I have a normal Flink DataSet, which I want to run through the
>>> operator (using the previously read data in the flatMap implementation). As
>>> I run the program several times, I do not want to broadcast the data every
>>> time, but rather just copy it on the nodes, and be fine with it.
>>> Can you answer my question from above? If the setParallelism-method
>>> works and selects five nodes for the first flatMap and five _other_ nodes
>>> for the second flatMap, then that would be fine for me if there is no other
>>> easy solution.
>>> Thanks for your help!
>>> Best
>>> Stefan
>>> On 14 September 2015 at 22:28, Fabian Hueske <fhueske@gmail.com> wrote:
>>>> Hi Stefan,
>>>> forcing the scheduling of tasks to certain nodes and reading files from
>>>> the local file system in a multi-node setup is actually quite tricky and
>>>> requires a bit understanding of the internals.
>>>> It is possible and I can help you with that, but would recommend to use
>>>> a shared filesystem such as HDFS if that is possible.
>>>> Best, Fabian
>>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>>> Hi,
>>>>> actually, I am distributing my data before the program starts, without
>>>>> using broadcast sets.
>>>>> However, the approach should still work, under one condition:
>>>>>> DataSet mapped1 =
>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
>>>>>> DataSet mapped2 =
>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
>>>>> Is it guaranteed, that this selects a disjoint set of nodes, i.e. five
>>>>> nodes for mapped1 and five other nodes for mapped2?
>>>>> Is there any way of selecting the five nodes concretely? Currently, I
>>>>> have stored the first half of the data on nodes 1-5 and the second half
>>>>> nodes 6-10. With this approach, I guess, nodes are selected randomly
so I
>>>>> would have to copy both halves to all of the nodes.
>>>>> Best,
>>>>> Stefan

