Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 45BCD200B8D for ; Fri, 23 Sep 2016 11:11:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 444A1160ACA; Fri, 23 Sep 2016 09:11:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 89AAA160AC2 for ; Fri, 23 Sep 2016 11:11:24 +0200 (CEST) Received: (qmail 85034 invoked by uid 500); 23 Sep 2016 09:11:23 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 85025 invoked by uid 99); 23 Sep 2016 09:11:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Sep 2016 09:11:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 48D121A06A5 for ; Fri, 23 Sep 2016 09:11:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 9J2B1tV3Orqi for ; Fri, 23 Sep 2016 09:11:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6710A5F478 for ; Fri, 23 Sep 2016 09:11:21 +0000 (UTC) Received: (qmail 84678 invoked by uid 99); 23 Sep 2016 09:11:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Sep 2016 09:11:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 78F8B2C0B06 for ; Fri, 23 Sep 2016 09:11:20 +0000 (UTC) Date: Fri, 23 Sep 2016 09:11:20 +0000 (UTC) From: "Aljoscha Krettek (JIRA)" To: commits@beam.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 23 Sep 2016 09:11:25 -0000 [ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515875#comment-15515875 ] Aljoscha Krettek commented on BEAM-644: --------------------------------------- Yes, as a replacement for {{outputWithTimestamp}} and {{withAllowedTimestampSkew}} this new proposal is perfect. [~kenn], I was just thinking about {{SplittableDoFn}} and what happens in the absence of data. Say you have some data that you emit form the DoFn that is clustered around timestamp {{t}}, then you have no data for a while and then you get data that is clustered around {{t + 100}}. In order for that data to not be late the watermark has to be held at {{t + 100}} but you cannot know that until you actually see the newer data. Holding back by some constant {{D}} would not help in that case. Or I might be missing something, of course. > Primitive to shift the watermark while assigning timestamps > ----------------------------------------------------------- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model > Reporter: Kenneth Knowles > Assignee: Kenneth Knowles > > There is a general need, especially important in the presence of SplittableDoFn, to be able to assign new timestamps to elements without making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one to produce late data, but does not allow one to shift the watermark so the new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log file that contains elements for the day preceding the log file. The timestamp on the filename must currently be the beginning of the log. If such elements are constantly flowing, it may be OK, but since we don't know that element is coming, in that absence of data, the watermark may advance. We need a way to keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the following pieces: > - A constant duration (positive or negative) D by which to shift the watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were mentioned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)