beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-241) Not easy for runners to get late-data dropping
Date Mon, 02 May 2016 17:04:13 GMT

    [ https://issues.apache.org/jira/browse/BEAM-241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266983#comment-15266983
] 

Kenneth Knowles commented on BEAM-241:
--------------------------------------

Your first point: I think that is a good idea. We should make all of this part of {{runners/core}}
so those runners that want to use it can do so. This is closely related to the Fn API, so
will probably face some changes as we sort that out. It seems like a fruitful area.

Your second point: We definitely want to move away from {{DoFns}} that do windowing operations.
With the exception of window merging, we'd like processing of a window to be fully isolated
from other windows (more parallelism possible / smaller scope of stateful DoFn state once
we make it user-facing, support shuffle-by-key-and-window implementation strategy, simpler
{{ParDo}} semantics, supports micro-batch runners... probably more pragmatic benefits we haven't
thought of will come from this clean theoretical move). This is a part of why I proposed making
{{Window.into}} a primitive.

So you definitely should phase out use of {{GroupAlsoByWindowViaWindowSetDoFn}} as a parameter
to {{ParDo}}. The logic in {{GroupAlsoByWindowViaWindowSetDoFn}} is fine and you can still
re-use it, you just need to get the {{StateInternals}} and {{outputWindowedValue}} via the
runner backend rather than the {{ProcessContext}}.

I've started this for the new {{InProcessPipelineRunner}} in [#268|https://github.com/apache/incubator-beam/pull/268/files]
- {{GroupByKey}} expands into runner-specific primitives {{GroupByKeyOnly}} and {{GroupAlsoByWindow}}
and then the runner provides a translator/interpreter for those transforms. Within that translator/interpreter
it is fine to use the logic in {{GroupAlsoByWindowViaWindowSetDoFn}}, as I do [right here|https://github.com/apache/incubator-beam/pull/268/files#diff-b81c7b54aea3aaed9e80891f97f3d911R97].

> Not easy for runners to get late-data dropping
> ----------------------------------------------
>
>                 Key: BEAM-241
>                 URL: https://issues.apache.org/jira/browse/BEAM-241
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Mark Shields
>            Assignee: Frances Perry
>
> Quite by accident realized the Flink runner delegates to GroupAlsoByWindowViaWindowSetDoFn
for GBK, which in turn delegates to ReduceFnRunner. The latter now assumes no messages will
arrive beyond the 'garbage collection' time of their target window(s).
> The Dataflow runner interposes a LateDataDroppingDoFnRunner into the path so as to drop
those too-late messages. That's done (I think) using DoFnRunners.createDefault.
> I don't think the Flink runner does that.
> We need a nice runner-friendly way of dealing with the too-late data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message