beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (BEAM-848) A better shuffle after reading from within mapWithState.
Date Wed, 22 Mar 2017 19:22:41 GMT


ASF GitHub Bot commented on BEAM-848:

GitHub user aviemzur opened a pull request:

    [BEAM-848] A better shuffle after reading from within mapWithState.

    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
     - [ ] Make sure the PR title is formatted like:
       `[BEAM-<Jira issue #>] Description of pull request`
     - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
           Travis-CI on your fork and ensure the whole test matrix passes).
     - [ ] Replace `<Jira issue #>` in the title with the actual Jira issue
           number, if there is one.
     - [ ] If this contribution is large, please file an Apache
           [Individual Contributor License Agreement](

You can merge this pull request into a Git repository by running:

    $ git pull sourcerdd-unbounded-default-partitioner

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2288
commit 0fa4d29784ef5940c2467529e5ae7fdb78c7b98b
Author: Aviem Zur <>
Date:   2017-03-22T13:20:51Z

    [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded

commit 581233f5022cabb9ef497611b643dd5413d52060
Author: Aviem Zur <>
Date:   2017-03-22T19:11:47Z

    [BEAM-1075] Shuffle the input read-values to get maximum parallelism


> A better shuffle after reading from within mapWithState.
> --------------------------------------------------------
>                 Key: BEAM-848
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Amit Sela
>            Assignee: Aviem Zur
> The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful
operation will be followed by a shuffle: 
> Since the stateful read maps "splitSource" -> "partition of a list of read values",
the following shuffle won't benefit in any way (the list of read values has not been flatMapped
yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner
to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner
and will skip shuffle if the partitioners match.
> It would be wise to shuffle the read values _after_ flatmap.
> I will break this into two tasks:
> # Set default-partitioner to the input RDD.
> # Shuffle (using Coders) the input.

This message was sent by Atlassian JIRA

View raw message