From user-return-30541-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Oct 28 10:30:55 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id D493C180638 for ; Mon, 28 Oct 2019 11:30:54 +0100 (CET) Received: (qmail 4897 invoked by uid 500); 28 Oct 2019 10:30:52 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 4887 invoked by uid 99); 28 Oct 2019 10:30:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Oct 2019 10:30:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 72CC31A42CF for ; Mon, 28 Oct 2019 10:30:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.249 X-Spam-Level: X-Spam-Status: No, score=0.249 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.249, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ververica-com.20150623.gappssmtp.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id gXaQQt7ULZVH for ; Mon, 28 Oct 2019 10:30:49 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::144; helo=mail-lf1-x144.google.com; envelope-from=david@data-artisans.com; receiver= Received: from mail-lf1-x144.google.com (mail-lf1-x144.google.com [IPv6:2a00:1450:4864:20::144]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 626657F604 for ; Mon, 28 Oct 2019 10:30:49 +0000 (UTC) Received: by mail-lf1-x144.google.com with SMTP id y127so7403375lfc.0 for ; Mon, 28 Oct 2019 03:30:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ververica-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc:content-transfer-encoding; bh=xyiPzPUgAZx+IoVu2nyb6zCwt5xEJvRCPWF4CIUe358=; b=HdRyxUOhapOX00f9YgnfgnNmLXW8YV0RtdfsY+eASI7M4VTneL40mlgPz+UxZAYBVN Mn8edxSULwZhJJKq/1nLIZzCMp9ZdlKavj/e23GXrVHpSJsmfEPnk+WlIUmeGbK5kFW7 aTAgMtw6tBDP53DSpWArTTmeI4TedWRGuv8mMG0+L+HTlNd+SuKTjDzTl5Qxv9I6pCdd 344v6qgmZn4RaDCZ5MwwIx+ZlbLgVBjQEHzmmmp5IRzlQ7UYtoR+G7MPFqNFyRgSlCKa PRIjDTDk2HeVfVttxyC/5gHQ0IQC0yOg4YNXYUYyLmAXOfGOiXCzItfT86nCLdwJZZ9g lNCg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc:content-transfer-encoding; bh=xyiPzPUgAZx+IoVu2nyb6zCwt5xEJvRCPWF4CIUe358=; b=nPWFeFhUIlBfuRXiIdH4geg8wki1sqGxk4H0c8SXZGdoKrb67Ng2QYrLvpDTdaMDcR IfBP8U8kzTubVavNU34TTx+RyqVYyfUvQt9f9O950CPTSy1WGdGk8J66Jj4Ip7LXGJ8O 2R1MQnm3PvGyY2L9+RgFhdHLnZFOsz+gK2dT3qrYNsRcB2KQInzghWW54a4pM0lqylXW 58KG6g7p7ZsCHI55koK5ivm4mbhX1U/VRzT7I4UF/DPAORNJliayIEPeomJPQP6cxBpp CH/3LM2TC9iSxKCWUNOEn2q89voLpZGza22mclB+Ns8YmaowpJnpdC1umbQ29mGTO80+ kE+A== X-Gm-Message-State: APjAAAW5CyPNU2lYPP770Yr1y/O3lZq9Vvoize9/DnFVNcx38KkfCXbk mdJ3jA35Psr7Uef1wbGbWWQr/tXYS30AmEiSWRTacw== X-Google-Smtp-Source: APXvYqwd73OSFkvYXLZ4bnOptlGSCgdc+UkRDKBDT/vLPINL2jwemVoXwS8iCRFkAGvsxbowjIvhXMp/zQcBJALyTQs= X-Received: by 2002:a19:be92:: with SMTP id o140mr11114305lff.40.1572258643342; Mon, 28 Oct 2019 03:30:43 -0700 (PDT) MIME-Version: 1.0 References: <80F4E8E5-989B-48B4-BC91-76DAA2DCA420@gmail.com> In-Reply-To: <80F4E8E5-989B-48B4-BC91-76DAA2DCA420@gmail.com> From: David Anderson Date: Mon, 28 Oct 2019 11:30:31 +0100 Message-ID: Subject: Re: Watermark won't advance in ProcessFunction To: Dian Fu Cc: =?UTF-8?B?5p2o5Yqb?= , user Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable The reason why the watermark is not advancing is that assignAscendingTimestamps is a periodic watermark generator. This style of watermark generator is called at regular intervals to create watermarks -- by default, this is done every 200 msec. With only a tiny bit of data to process, the job doesn't run long enough for the watermark generator to ever be called. On Mon, Oct 28, 2019 at 9:17 AM Dian Fu wrote: > > Before a program close, it will emit Long.MaxValue as the watermark and t= hat watermark will trigger all the windows. This is the reason why your `ti= meWindow` program could work. However, for the first program, you have not = registered the event time timer(though context.timerService.registerEventTi= meTimer) and also there is also no onTimer logic defined to process it. > > =E5=9C=A8 2019=E5=B9=B410=E6=9C=8828=E6=97=A5=EF=BC=8C=E4=B8=8B=E5=8D=884= :01=EF=BC=8C=E6=9D=A8=E5=8A=9B =E5=86=99=E9=81=93=EF= =BC=9A > > It seems to be the case. But when I use timeWindow or CEP with fromCollec= tion, it works well. For example, > > ``` > sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimest= amps(identity[Long]) > .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print() > ``` > > prints > > ``` > 1 > 1002 > 2002 > 3002 > ``` > > How can I implement my KeyedProcessFunction so that it would work as expe= cted. > > Dian Fu =E4=BA=8E 2019=E5=B9=B410=E6=9C=8828=E6= =97=A5=E5=91=A8=E4=B8=80 =E4=B8=8B=E5=8D=882:04=E5=86=99=E9=81=93=EF=BC=9A >> >> Hi, >> >> It generates watermark periodically by default in the underlying impleme= ntation of `assignAscendingTimestamps`. So for your test program, the water= mark is still not generated yet and I think that's the reason why it's Long= .MinValue. >> >> Regards, >> Dian >> >> =E5=9C=A8 2019=E5=B9=B410=E6=9C=8828=E6=97=A5=EF=BC=8C=E4=B8=8A=E5=8D=88= 11:59=EF=BC=8C=E6=9D=A8=E5=8A=9B =E5=86=99=E9=81=93= =EF=BC=9A >> >> I'm going to sort elements in a PriorityQueue and set up timers at (curr= entWatermark + 1), following the instructions in https://ci.apache.org/proj= ects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.htm= l#timer-coalescing. >> >> However, it seems that context.timerService().currentWatermark() always = returns Long.MinValue and my onTimer will never be called. Here's minimal p= rogram to reproduce the problem. Am I missing something? >> >> ``` >> val sEnv =3D StreamExecutionEnvironment.getExecutionEnvironment >> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> sEnv.setParallelism(argOps.parallelism()) >> sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identi= ty[Long]) >> .process(new ProcessFunction[Long, Long] { >> override def processElement(i: Long, context: ProcessFunction[Long= , Long]#Context, collector: Collector[Long]): Unit =3D { >> collector.collect(context.timerService().currentWatermark()) >> } >> }).print() >> sEnv.execute() >> ``` >> >> ``` >> -9223372036854775808 >> -9223372036854775808 >> -9223372036854775808 >> ``` >> >> >