beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
Date Tue, 27 Jun 2017 18:14:00 GMT


Kenneth Knowles commented on BEAM-2140:

I considered for a long time what should happen with processing time triggers as far as window
expiry. We spent quite some time coming up with the semantics at,
long before Beam. I don't claim it is perfect (it is way too complex, for one) but it represents
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop it (some transforms
may falter on droppable inputs, but that is specific to the transform)
* input timestamp and output timestamp are decoupled, so you can reason about whether to ignore
input based on whether the resulting output would be droppable

Some possibilities that I think don't break the model:

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is basically valid. We gain clarity by not conflating them
in APIs and discussions. But how they interact with watermarks, etc, should be basically compatible.
Currently processing time timers are treated as inputs with a timestamp equal to the input
watermark at the moment of their arrival. So this change would cause an input hold because
there is a known upcoming element that just hasn't arrived.

In streaming: this holds things up too much. It also makes repeatedly firing after processing
time cause an infinite loops, versus what happens today where it naturally goes through window
expiry and GC.

In batch: this breaks the unified model for processing historical data in a batch mode. With
the semantics as they exist today, the way that batch "runs" triggers and processing time
timers (by ignoring them) is completely compatible with the semantics. So any user who writes
a correct transform has good assurances they it will work in both modes. If processing time
timers held watermarks like this they would need to be processed in batch mode, yet they are
contradictory with the whole point of it.

We can omit unbounded SDFs from this unification issue, probably, but a bounded-per-element
SDF should certainly work on streamed unbounded input as well as bounded input.

*Decide whether to drop a processing time timer not based on the input watermark but based
on whether its output would be droppable*

This lets the input watermark advance, but still does not allow infinitely repeating processing
time timers to terminate with window expiry automatically, and it still breaks the unified
model. We could alleviate both issues by refusing to set new timers that would already be
expired. I think this is just a rabbit hole of unnatural corner cases so we should avoid it.

*In addition to the processing time timers that ProcessFn sets, also set a GC timer*

This seems straightforward and a simple and good idea. These timers are also still run in
batch mode for historical reprocessing.

Can you clarify how it does not work? Is it because you need to create a "loop" that continues
to fire until the residual is gone? Currently, there is simply no way to make a perpetual
loop with timers because of the commentary below.

*Treat event time timers as inputs with their given timestamp*

This would combine the GC timer idea and let you make a looping structure. This currently
cannot work because timers fire only when the input watermark is strictly greater than their
timestamp. The semantics of "on time" and "final GC" panes depends on this, so we'd have a
lot of work to do. But I think there might be a consistent world where event time timers are
treated as elements, and fire when the watermark arrives at their timestamp. {{@OnWindowExpiration}}
is then absolutely required and cannot be simulated by a timer.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>                 Key: BEAM-2140
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled the tests
to unblock the open PR for BEAM-1763.

This message was sent by Atlassian JIRA

View raw message