From issues-return-413277-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Oct 29 09:33:03 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 09150180648 for ; Thu, 29 Oct 2020 10:33:03 +0100 (CET) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id 3A74512473B for ; Thu, 29 Oct 2020 09:33:02 +0000 (UTC) Received: (qmail 60699 invoked by uid 500); 29 Oct 2020 09:33:01 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 60627 invoked by uid 99); 29 Oct 2020 09:33:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Oct 2020 09:33:01 +0000 Received: from jira2-he-de.apache.org (static.54.33.119.168.clients.your-server.de [168.119.33.54]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C602E40F09 for ; Thu, 29 Oct 2020 09:33:00 +0000 (UTC) Received: from jira2-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira2-he-de.apache.org (ASF Mail Server at jira2-he-de.apache.org) with ESMTP id 1924CC8065A for ; Thu, 29 Oct 2020 09:33:00 +0000 (UTC) Date: Thu, 29 Oct 2020 09:33:00 +0000 (UTC) From: "Leonard Xu (Jira)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (FLINK-19878) Improve watermark for upsertSource MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-19878?page=3Dcom.atlassi= an.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-19878: ------------------------------- Summary: Improve watermark for upsertSource (was: Improve watermark C= hangelogNormalize for upsertSource) > Improve watermark for upsertSource > ----------------------------------- > > Key: FLINK-19878 > URL: https://issues.apache.org/jira/browse/FLINK-19878 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: Leonard Xu > Priority: Major > > Cutrrently, for a upsertSource like upsert-kafka, the=C2=A0WatermarkAssig= ner is followed after=C2=A0ChangelogNormalize Node,=C2=A0 it may returns Lo= ng.MaxValue as watermark if some parallelism doesn't have data.=C2=A0 > As an improvement, we can move the WatermarkAssigner to be after the=C2= =A0SourceCan Node and thus the watermark will produce like general Source. > =C2=A0 > {code:java} > +- Exchange(distribution=3D[hash[currency]], changelogMode=3D[I,UA,D]) > +- WatermarkAssigner(rowtime=3D[rowtime], watermark=3D[rowtime], ch= angelogMode=3D[I,UA,D]) > +- ChangelogNormalize(key=3D[currency], changelogMode=3D[I,UA,D]= ) > +- Exchange(distribution=3D[hash[currency]], changelogMode=3D= [UA,D]) > +- TableSourceScan(table=3D[[default_catalog, default_data= base, rates_history]], fields=3D[currency, rate, rowtime], changelogMode=3D= [UA,D]) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)