flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Bunk <stefan.b...@googlemail.com>
Subject Re: Distribute DataSet to subset of nodes
Date Wed, 16 Sep 2015 19:44:48 GMT
Hi Fabian,

the local file problem would however not exist, if I just copy both halves
to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper
broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the
user-remaining memory on a node, but not both halves. So my program would
probably memory-crash, if the scheduling trusts one node so much, that it
wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but
how likely is it, that one node is given two flatMaps and another one is
basically idling? I have no idea of the internals, but I guess there is
some heuristic inside which decides how to distribute.In the normal setup
that all 10 nodes are up, connection is good, all nodes have the same
resources available, input data is evenly distributed in HDFS, then the
default case should be to distribute to all 10 nodes, right?

I am not running in production, so for me it would be ok, if this works out


On 15 September 2015 at 23:40, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Stefan,
> the problem is that you cannot directly influence the scheduling of tasks
> to nodes to ensure that you can read the data that you put in the local
> filesystems of your nodes. HDFS gives a shared file system which means that
> each node can read data from anywhere in the cluster.
> I assumed the data is small enough to broadcast because you want to keep
> it in memory.
> Regarding your question. It is not guaranteed that two different tasks,
> each with parallelism 5, will be distributed to all 10 nodes (even if you
> have only 10 processing slots).
> What would work is to have one map task with parallelism 10 and a Flink
> setup with 10 task managers on 10 machines with only one processing slot
> per TM. However, you won't be able to replicate the data to both sets of
> maps because you cannot know which task instance will be executed on which
> machine (you cannot distinguish the tasks of both task sets).
> As I said, reading from local file system in a cluster and forcing task
> scheduling to specific nodes is quite tricky.
> Cheers, Fabian
> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>> Hi Fabian,
>> I think we might have a misunderstanding here. I have already copied the
>> first file to five nodes, and the second file to five other nodes, outside
>> of Flink. In the open() method of the operator, I just read that file via
>> normal Java means. I do not see, why this is tricky or how HDFS should help
>> here.
>> Then, I have a normal Flink DataSet, which I want to run through the
>> operator (using the previously read data in the flatMap implementation). As
>> I run the program several times, I do not want to broadcast the data every
>> time, but rather just copy it on the nodes, and be fine with it.
>> Can you answer my question from above? If the setParallelism-method works
>> and selects five nodes for the first flatMap and five _other_ nodes for the
>> second flatMap, then that would be fine for me if there is no other easy
>> solution.
>> Thanks for your help!
>> Best
>> Stefan
>> On 14 September 2015 at 22:28, Fabian Hueske <fhueske@gmail.com> wrote:
>>> Hi Stefan,
>>> forcing the scheduling of tasks to certain nodes and reading files from
>>> the local file system in a multi-node setup is actually quite tricky and
>>> requires a bit understanding of the internals.
>>> It is possible and I can help you with that, but would recommend to use
>>> a shared filesystem such as HDFS if that is possible.
>>> Best, Fabian
>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.bunk@googlemail.com>:
>>>> Hi,
>>>> actually, I am distributing my data before the program starts, without
>>>> using broadcast sets.
>>>> However, the approach should still work, under one condition:
>>>>> DataSet mapped1 =
>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
>>>>> DataSet mapped2 =
>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
>>>> Is it guaranteed, that this selects a disjoint set of nodes, i.e. five
>>>> nodes for mapped1 and five other nodes for mapped2?
>>>> Is there any way of selecting the five nodes concretely? Currently, I
>>>> have stored the first half of the data on nodes 1-5 and the second half on
>>>> nodes 6-10. With this approach, I guess, nodes are selected randomly so I
>>>> would have to copy both halves to all of the nodes.
>>>> Best,
>>>> Stefan

View raw message