flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
Date Mon, 06 Mar 2017 18:38:32 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897795#comment-15897795

ASF GitHub Bot commented on FLINK-4545:

GitHub user NicoK opened a pull request:


    [FLINK-4545] use size-restricted LocalBufferPool instances for network communication

    Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the network buffer
    With this PR, the number of buffers a `LocalBufferPool` has to offer, will be limited
to `2 * <number of channels> + 8` for both input and output connections. This way, we
reduce buffer bloat in our network stack without limiting ourselves to specific jobs and their
connections too much since the total number of network buffers can now be arbitrarily large
again without consequences on the delays checkpoint barriers, for example, have while travelling
through all TMs.
    Eventually, this will lead to the network buffer parameter being removed (which was the
initial goal) but in a simple scenario like the following, with a parallelism of 2 and thus
running on 6 TMs, we were able to reduce the 75-percentile of checkpoint delays by 60% from
38ms to 16ms (median at 7 for both).
    final StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
    DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new LongSource());
    		.map(new IdentityMapFunction<Tuple2<Long, Long>>())
    		.addSink(new DiscardingSink<Tuple2<Long, Long>>())
    By adding random delays (every 1000 keys 0-1ms) to the `IdentityMapFunction`, the median
even improves from 5026ms to 293ms.
    Both scenarios do not influence the throughput of the program but for real programs, reductions
in delay may differ since there actual state may need to be stored and other components take
part as well ;)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3480
commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-10T13:36:37Z

    [FLINK-4545] remove (unused) persistent partition type

commit 11557c004450bcbbe680f1575f0e41d164424eae
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-10T15:11:08Z

    [docs] improve some documentation around network buffers

commit cd999061d04ae803c79473241ac1f9b39c1f2731
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-10T15:12:19Z

    [hotfix][network] add some assertions documenting on which locks we rely

commit 8f529bb3f42916c816c5091228569952917ad9b5
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-03-01T13:33:44Z

    [FLINK-4545] remove fixed-size BufferPool instances
    These were unused except for unit tests and will be replaced with bounded
    BufferPool instances.

commit 91cea2917e9453f9de5c02472d99d4fc0d090dda
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-03-06T11:36:02Z

    [FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without partition type
    This removes
    JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern)
    and requires the developer to call
    JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType
    instead and think about the partition type to add.

commit 83d1404b106b558679e4c9ef16123fbc6b5eac72
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-03-06T11:37:56Z

    [FLINK-4545] remove unused IntermediateDataSet constructors
    These were implying a default result partition type which we want the developer
    to actively decide upon.

commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-10T13:53:09Z

    [FLINK-4545] add a bounded result partition type
    This can be used to limit the number of network buffers used for this partition.
    (borrows the appropriate parts of a commit previously sketched for
    FLINK-5088 to implement bounded network queue lengths)

commit b57f0652a768645a5712d376d0e4b438f35cfa6c
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-10T17:22:55Z

    [FLINK-4545] allow LocalBufferPool to use a limited number of buffers

commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-20T16:12:54Z

    [FLINK-4545] also make the ResultPartitionType available at the InputGate
    This way, we know what kind of result partition is consumed by the input gate.

commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-13T18:24:45Z

    [FLINK-4545] try to set an upper bound on the LocalBufferPool if restricted
    Use "2 * <number of channels> + 8" from the following considerations:
    * 1 buffer for in-flight data in the subpartition/input channel
    * 1 buffer for parallel serialization
    * + some extra buffers
    Also re-introduce some tests for bounded buffer pools similar to the fixed-size
    buffer pool tests before.

commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406
Author: Nico Kruber <nico@data-artisans.com>
Date:   2017-02-24T12:41:11Z

    [FLINK-4545] re-implement NetworkBufferPool#redistributeBuffers
    This version also takes the bounded network buffers into account.
    The distribution is not strictly uniform anymore though:
    * for every buffer pool, we determine the maximum number of buffers it can take
      from the available number - let's call this its 'capacity'
    * then, each of them will get roughly available * capacity / totalCapacity
      buffers on top of the required number of buffers


> Flink automatically manages TM network buffer
> ---------------------------------------------
>                 Key: FLINK-4545
>                 URL: https://issues.apache.org/jira/browse/FLINK-4545
>             Project: Flink
>          Issue Type: Wish
>          Components: Network
>            Reporter: Zhenzhong Xu
> Currently, the number of network buffer per task manager is preconfigured and the memory
is pre-allocated through taskmanager.network.numberOfBuffers config. In a Job DAG with shuffle
phase, this number can go up very high depends on the TM cluster size. The formula for calculating
the buffer count is documented here (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster size dynamically
and then leverage the up-coming Flink feature to support scaling job parallelism/rescaling
at runtime. 
> If the buffer count config is static at runtime and cannot be changed without restarting
task manager process, this may add latency and complexity for scaling process. I am wondering
if there is already any discussion around whether the network buffer should be automatically
managed by Flink or at least expose some API to allow it to be reconfigured. Let me know if
there is any existing JIRA that I should follow.

This message was sent by Atlassian JIRA

View raw message