Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 98FB7200CB7 for ; Fri, 30 Jun 2017 16:04:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 96305160BEB; Fri, 30 Jun 2017 14:04:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DD4F6160BDD for ; Fri, 30 Jun 2017 16:04:24 +0200 (CEST) Received: (qmail 48744 invoked by uid 500); 30 Jun 2017 14:04:24 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48735 invoked by uid 99); 30 Jun 2017 14:04:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 14:04:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 011FADFB30; Fri, 30 Jun 2017 14:04:23 +0000 (UTC) From: aljoscha To: commits@beam.apache.org Reply-To: commits@beam.apache.org Message-ID: Subject: [GitHub] beam pull request #3480: [BEAM-2140] Execute Splittable DoFn directly in Fli... Content-Type: text/plain Date: Fri, 30 Jun 2017 14:04:24 +0000 (UTC) archived-at: Fri, 30 Jun 2017 14:04:25 -0000 GitHub user aljoscha opened a pull request: https://github.com/apache/beam/pull/3480 [BEAM-2140] Execute Splittable DoFn directly in Flink Runner Before, we were using ProcessFn. This was causing problems with the Flink Runner for two reasons: 1. StatefulDoFnRunner is in the processing path, which means processing-time timers are being dropped when the watermark reaches +Inf 2. When a pipeline shuts down (for example, when bounded sources shut down) Flink will drop any outstanding processing-time timers, meaning that that any remaining Restrictions will not be processed. The fix for 1. is to execute the splittable DoFn directly, thereby bypassing the late data/timer dropping logic. The fix for 2. builds on the fix for 1. and also introduces a "last resort" even-time timer that fires at +Inf and makes sure that any remaining restrictions are being exhausted. R: @jkff Not sure if we wan't to fix it like this or maybe adapt `ProcessFn` and remove `StatefulDoFnRunner` from the processing path. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/beam fix-flink-splittable-dofn-squashed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3480 ---- commit e30efe70bdafbbcc5bc1082f980867e58684c351 Author: Aljoscha Krettek Date: 2017-06-26T10:10:18Z [BEAM-2140] Execute Splittable DoFn directly in Flink Runner Before, we were using ProcessFn. This was causing problems with the Flink Runner for two reasons: 1. StatefulDoFnRunner is in the processing path, which means processing-time timers are being dropped when the watermark reaches +Inf 2. When a pipeline shuts down (for example, when bounded sources shut down) Flink will drop any outstanding processing-time timers, meaning that that any remaining Restrictions will not be processed. The fix for 1. is to execute the splittable DoFn directly, thereby bypassing the late data/timer dropping logic. The fix for 2. builds on the fix for 1. and also introduces a "last resort" even-time timer that fires at +Inf and makes sure that any remaining restrictions are being exhausted. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---