beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (BEAM-3499) Watch can make no progress if a single poll takes more than checkpoint interval
Date Thu, 25 Jan 2018 01:54:01 GMT


ASF GitHub Bot commented on BEAM-3499:

jkff opened a new pull request #4483: [BEAM-3499, BEAM-2607] Gives the runner access to positions
of SDF claimed blocks
   This addresses the following issues:
   * Watch can make no progress if a single
poll takes more than checkpoint interval
   * Enforce that SDF must return stop() after
a failed tryClaim() call
   The former is the primary motivation for this PR. This PR changes SDF checkpointing timer
countdown to start from the first claimed block, rather than from the beginning of `@ProcessElement`.
This requires giving the runner visibility into claimed blocks. Such visibility enables fixing
BEAM-2607 as well. It also is a required part of implementing SDF splitting over Fn API (tracked
   This PR also, of course, changes the Watch transform to the new API; and, while we're at
it, does some related improvements:
   * Compresses Watch.GrowthState using Snappy. E.g. with 100k files, the encoded state is
about 3MB instead of 8MB. Compressing it much more is difficult because the state includes
uncompressible hashes. To address this, one must shard the filepattern, or implement the improvements
suggested in .
   * Makes direct runner create a clone of state cells - I did this mainly because I noticed
that GrowthStateCoder was never called on the Watch state, which risks missing coder bugs
when testing with direct runner.
   This PR is update-incompatible for users of the Watch transform, e.g. FileIO.match().continuously().
This is an experimental and very recent transform, so I'm going to ignore the incompatibility.
It also requires a traditional Dataflow worker dance to get the worker container in sync with
these runners-core changes - I'll perform that when the rest of the PR is approved.
   R: @tgroh @chamikaramj 
   CC: @kennknowles @reuvenlax

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Watch can make no progress if a single poll takes more than checkpoint interval
> -------------------------------------------------------------------------------
>                 Key: BEAM-3499
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Major
> E.g. when using it to poll a filepattern with hundreds of thousands of files, a single
poll may take >10 seconds (default checkpoint interval in OutputAndTimeBoundedSplittableProcessElementInvoker).
Because of that, the tracker (GrowthTracker) gets checkpointed before anything is added to
it, i.e. before [,] at
a moment when it doesn't contain any useful information, so the residual checkpoint state
is as empty as the initial one. When we resume from the residual checkpoint, the situation
simply repeats - until we get lucky enough to either take <10s to poll, or to not be asked
to checkpoint for >10s (e.g. cause the checkpointing thread isn't scheduled).
> One possible fix to this is to change the SDF checkpointing strategy to have a progress
guarantee: e.g., start counting time from the moment the first block is claimed, or allow
the tracker to refuse checkpointing if nothing is claimed yet, or something like that.
> A workaround for users of this (primarily via FileIO.match().continuously()) is to shard
their filepattern into a set of finer-granularity filepatterns matching fewer files, so that
each match call takes less than 10 seconds.

This message was sent by Atlassian JIRA

View raw message