From user-return-25663-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jan 30 01:42:13 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6034918067E for ; Wed, 30 Jan 2019 02:42:12 +0100 (CET) Received: (qmail 81207 invoked by uid 500); 30 Jan 2019 01:42:06 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 81195 invoked by uid 99); 30 Jan 2019 01:42:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jan 2019 01:42:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 94CA1C6765 for ; Wed, 30 Jan 2019 01:42:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.048 X-Spam-Level: ** X-Spam-Status: No, score=2.048 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hnhnn5Q_zEJg for ; Wed, 30 Jan 2019 01:42:04 +0000 (UTC) Received: from mail-it1-f195.google.com (mail-it1-f195.google.com [209.85.166.195]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 83BD05FE1F for ; Wed, 30 Jan 2019 01:42:03 +0000 (UTC) Received: by mail-it1-f195.google.com with SMTP id a6so7889234itl.4 for ; Tue, 29 Jan 2019 17:42:03 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=HIclC40mNENG9mUCPfnExmiPH3WyEkefwQHfcvM4DKk=; b=K42wnhbv4tDk1Npo5fWeQCBYv3TGDrignt7ZHu8+5q9a9Utm2B3LbrtKEtbPgrtGu0 2HwVoD7K9m0cCI1GbXlT1O2YiOK4Un3WCQ5i22g6yrkIwGmvTJyT6hjNK8mkTo3VeprV hwqDzfPRLVWNv01yizpn9dZ+9Wx2lt61cJOs6IQjOaWGaNzfA6ClVBsKGN39VpmsbwLJ /N5acTrmkzsURtkY8azZlsDvsR70IdzbR4ivrWxrIYHGIIxyTtV6A9LK2UIjGDg4Xq4i wmWp8LqHBeIEg93sI97ImdDWLAx6M2YJe3XxXaWhyTaT3S3KNYC/8KtdUdT0n9MybhZT 2xdw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=HIclC40mNENG9mUCPfnExmiPH3WyEkefwQHfcvM4DKk=; b=KdcI3NrCZ4w+OwbUfl/ZUGxFrr7qPK7SmM9d2f6f42uXVZExMoBh3OCxHY2Wdi1/RD ABS11X7saFLIJOcCzCICaeZ+YY3Zu0xCn8kzDHHBmMr6C80SYjdpHdfO5YwC2pDQuNJI 2BPJsKjKLwRUPEkBeuGehNoV6m4xuoUGB9SFIoqMgZBdkGY68F0W8utsBsaGe33EClJ9 ya3F3ODXdlOFuks6o/G6zT20giMQ+WG38Nxp6gXAyMa8URbmDZFs6K1S89S5c6JqgsYY NubrcQZAOGf/sf9LMox1qFBQePkjwhpRgBh+mk5NmYBgiIy+s3/RH4LVC5A55YWYtB4G 0w9A== X-Gm-Message-State: AHQUAubS7IVTDF9iv7EF2aDMUxbetlgTVT9im0FYqHLqtGKsinthgnA4 OzJW4k5tKGZMxJTipnW2NnECFxvvZiAuO4Yb4kY= X-Google-Smtp-Source: AHgI3IYEzd2uI/yCh/qxUL2zSpx4BM5U9L35w+4g31mA/+sx5wfJZfbXrGtK6lNew7bdquM1X27zvZxgPS1/Q1fmr/w= X-Received: by 2002:a24:760b:: with SMTP id z11mr1372112itb.105.1548812522303; Tue, 29 Jan 2019 17:42:02 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Congxian Qiu Date: Wed, 30 Jan 2019 09:41:54 +0800 Message-ID: Subject: Re: About KafkaConsumer and WM'ing and EventTime charactersitics To: Vishal Santoshi Cc: user Content-Type: multipart/alternative; boundary="00000000000037385c0580a30472" --00000000000037385c0580a30472 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Vishal May this doc[1] be helpful for you. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/= kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission Best, Congxian Vishal Santoshi =E4=BA=8E2019=E5=B9=B41=E6=9C= =8830=E6=97=A5=E5=91=A8=E4=B8=89 =E4=B8=8A=E5=8D=884:36=E5=86=99=E9=81=93= =EF=BC=9A > It seems from > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.htm= l > that iTimeCharacteristic.IngestionTime should do the trick. > > Just wanted to confirm that the ingestion time is the event time provided > by the kafka producer. > > On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi > wrote: > >> In case where one needs t to use kafka event time ( ingestion time ) >> for watermark generation and timestamp extraction is setting >> EventTimeCharactersitic as EventTime enough ? >> >> Or is this explicit code required ? >> >> consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWaterma= rks() { >> @Nullable >> @Override >> public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, l= ong extractedTimestamp) { >> return new Watermark(extractedTimestamp); >> } >> >> @Override >> public long extractTimestamp(KafkaRecord element, long previousEleme= ntTimestamp) { >> return previousElementTimestamp; >> } >> }); >> >> >> >> >> >> --00000000000037385c0580a30472 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable

Vishal Santoshi <= vishal.santo= shi@gmail.com> =E4=BA=8E2019=E5=B9=B41=E6=9C=8830=E6=97=A5=E5=91=A8= =E4=B8=89 =E4=B8=8A=E5=8D=884:36=E5=86=99=E9=81=93=EF=BC=9A
It seems from=C2=A0https://ci.apache.org/p= rojects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.= IngestionTime=C2=A0should do the tr= ick.=C2=A0=C2=A0

Just want= ed to confirm that the ingestion time is the event time provided by the kaf= ka producer.=C2=A0

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <= ;vishal.sant= oshi@gmail.com> wrote:
=C2=A0In case where one needs t to use kafka= event time ( ingestion time )=C2=A0 for watermark generation and timestamp= extraction is setting=C2=A0 EventTimeCharactersitic=C2=A0 as EventTime eno= ugh ?=C2=A0

Or is this=C2=A0 explicit code required ?=C2= =A0
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRe=
cord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord = lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
@Override
public long extractTimestamp(KafkaRecord element, long
previousElement= Timestamp) {
return previousElementTimestamp;
}
});



=

--00000000000037385c0580a30472--