flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: fan out parallel-able operator sub-task beyond total slots number
Date Mon, 18 Apr 2016 08:03:52 GMT
Hi Chen,

two subtasks of the same operator can never be executed within the same
slot/pipeline. The `slotSharingGroup` allows you to only control which
subtasks of different operators can be executed along side in the same
slot. It basically allows you to break pipelines into smaller ones.
Therefore, you need at least as many slots as the maximum degree of
parallelism is in your program (so in your case 1000).

Cheers,
Till

On Sun, Apr 17, 2016 at 6:54 PM, Chen Qin <qinnchen@gmail.com> wrote:

> Hi there,
>
>
> I try run large number of subtasks within a task slot using slot sharing
> group. The usage scenario tried to adress operator that makes a network
> call with high latency yet less memory or cpu footprint. (sample code below)
>
> From doc provided, slotsharinggroup seems the place to look at. Yet it
> seems it were not designed to address the scenario above.
>
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources
>
> My question is, which is best way to fan out large number of sub tasking
> parallel within a task?
>
> public void testFanOut() throws Exception{
>     env = StreamExecutionEnvironment.getExecutionEnvironment();
> ...
>     env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new FlatMapFunction<DummyFlinkRecord,
Long>() {
>         @Override
>         public void flatMap(DummyFlinkRecord dummyFlinkRecord, Collector<Long>
collector) throws Exception {
>             Thread.sleep(1000); //latency is high, needs to fan out
>             collector.collect(1l);
>         }
>     }).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new FilterFunction<Long>()
{
>         @Override
>         public boolean filter(Long aLong) throws Exception {
>             return true;
>         }
>     }).setParallelism(10).addSink(new SinkFunction<Long>() {
>         @Override
>         public void invoke(Long aLong) throws Exception {
>             System.out.println(aLong);
>         }
>     });
>     env.execute("fan out 100 subtasks for 1s delay mapper");
> }
>
> Thanks,
> Chen Qin
>

Mime
View raw message