flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Flink SocketTextStream source scheduled to a single machine
Date Tue, 19 Sep 2017 09:55:58 GMT
Hi Le Xu,

the reason why all different SocketTextStreamFunction sources are scheduled
to the same machine is because of slot sharing. Slot sharing allows Flink
to schedule tasks belonging to different operators into the same slot. This
allows, for example, to achieve better colocation between tasks which
depend on each other (e.g. build-side, probe-side and actual join operator
running in the same slot). Moreover, it makes it easier to reason about how
many slots your application needs, which is the maximum parallelism of your
job.

However, the downside is that independent components of your job won't be
spread across the cluster but usually end up in the same slot(s)
(consequently on the same machine, too) due to slot sharing.

You can disable slot sharing for parts of your job if you set explicitly a
different slot sharing group name. Then only operators which are assigned
to the same slot sharing group are subject to slot sharing. Down stream
operators inherit the slot sharing group from their inputs. Thus, if you
have an embarrassingly parallel job, then it suffices to only the set the
slot sharing group at the sources.

for(int i =0; i< hosts.length; i++) {
    DataStream<String> someStream = env
       .socketTextStream(hosts[i], ports[i])
       .slotSharingGroup("socket_" + i);

    DataStream<Tuple2<String, String>> joinedAdImpressions =
rawMessageStream.rebalance() ...
}

Cheers,
Till

On Mon, Sep 18, 2017 at 10:09 PM, Le Xu <sharonxu65@gmail.com> wrote:

> Hello!
>
> I'm trying to figure out how it happens: I'm having a program reading from
> multiple socketTextStream and these text streams feed into different data
> flow (and these data streams never connect in my job). It looks something
> similar to below:
>
> for(int i =0; i< hosts.length; i++) {
>
>     DataStream<String> someStream = env.socketTextStream(hosts[i],
> ports[i]);
>     DataStream<Tuple2<String, String>> joinedAdImpressions =
> rawMessageStream.rebalance() ...
>
> However, when I run the job on a cluster I found that all source task have
> been scheduled to one machine so the machine becomes a severe bottleneck
> for the performance. Any ideas how would this happen?
>
> Thanks!
>

Mime
View raw message