beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2359) SparkTimerInternals inputWatermarkTime does not get updated in cluster mode
Date Mon, 12 Jun 2017 14:07:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046594#comment-16046594
] 

ASF GitHub Bot commented on BEAM-2359:
--------------------------------------

GitHub user aviemzur opened a pull request:

    https://github.com/apache/beam/pull/3343

    [BEAM-2359] Fix watermark broadcasting to executors in Spark runner

    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
     - [ ] Make sure the PR title is formatted like:
       `[BEAM-<Jira issue #>] Description of pull request`
     - [ ] Make sure tests pass via `mvn clean verify`.
     - [ ] Replace `<Jira issue #>` in the title with the actual Jira issue
           number, if there is one.
     - [ ] If this contribution is large, please file an Apache
           [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
    
    ---
    R: @amitsela 
    CC: @staslev @kobisalant 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aviemzur/beam BEAM-2359-watermark-bug-spark

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3343
    
----
commit f1b679e402734f20dcd9645babaec0a3f291e259
Author: Aviem Zur <aviemzur@gmail.com>
Date:   2017-06-12T14:04:00Z

    [BEAM-2359] Fix watermark broadcasting to executors in Spark runner

----


> SparkTimerInternals inputWatermarkTime does not get updated in cluster mode
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-2359
>                 URL: https://issues.apache.org/jira/browse/BEAM-2359
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Aviem Zur
>            Assignee: Amit Sela
>             Fix For: 2.1.0
>
>
> {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster mode.
> This causes windows to not get closed and state to increase forever in memory and processing
time to increase leading to eventual application crash (also, triggers based on the watermark
do not fire).
> The root cause is 
> a call from within the {{updateStateByKey}} operation in [SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
which tries to access a static reference to a {{GlobalWatermarkHolder}} broadcast variable,
however, in cluster mode this static reference would be a different one in the executor's
JVM and is null (this works in local mode since the executor and driver are on the same JVM).
> Alternative Solutions (And viability of solution):
> * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not viable since
even if we use the broadcast correctly, broadcast variables can't be used in this case (from
within {{updateStateByKey}}) since  {{updateStateByKey}} is a {{DStream}} operator and not
an {{RDD}} operator so it will not be updated every micro-batch but rather will retain the
same initial value.
> * -Broadcast variable to update the data in an additional transform- - Create an additional
transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator {{updateStateByKey}}
and use a broadcast which will be updated (since this is an {{RDD}} operator), and add this
value to the keyed datum itself so it will be available in the {{DStream}} operator {{updateStateByKey}}.
Not viable since this will only update keys which have had new data appear in the microbatch,
however we also want to update the watermark value for keys which did not have new data appear
in the microbatch.
> * -Broadcast variable to update a static reference- - Create an additional transform
on the {{DStream}}'s RDDs prior to the {{DStream}} operator {{updateStateByKey}} and use a
broadcast which will be updated (since this is an {{RDD}} operator), and set this value in
a static reference within the executor. Not viable since we cannot ensure that all executors
will receive partitions to process in each microbatch.
> * Server to be polled lazily every microbatch from within the {{updateStateByKey}} operator
- Spin a server on some configured port on the driver which will serve the current watermarks
upon request. Lazily poll this value every microbatch from within the {{updateStateByKey}}
operator and update a static reference within the executor. Viable, however does not use Spark
native operations and incurs code maintenance for this and operational cost for the user (open
ports in firewalls, etc.).
> * Drop/register watermarks as a block in BlockManager and request remote version from
within the {{updateStateByKey}} operator - Update watermarks as a block in the BlockManager
on the driver by dropping and reregistering the block every microbatch. Lazily poll this value
every microbatch from within the {{updateStateByKey}} operator and update a static reference
within the executor. Viable, less "ugly" than the server version and requires less operational
cost.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message