Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0C3C0200CA3 for ; Thu, 18 May 2017 00:21:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0AC58160BCB; Wed, 17 May 2017 22:21:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFED5160BBA for ; Thu, 18 May 2017 00:21:15 +0200 (CEST) Received: (qmail 45578 invoked by uid 500); 17 May 2017 22:21:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 45562 invoked by uid 99); 17 May 2017 22:21:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 May 2017 22:21:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 633D51A0318 for ; Wed, 17 May 2017 22:21:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.38 X-Spam-Level: ** X-Spam-Status: No, score=2.38 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ZMb_0t87gBCj for ; Wed, 17 May 2017 22:21:12 +0000 (UTC) Received: from mail-oi0-f42.google.com (mail-oi0-f42.google.com [209.85.218.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0140D5F568 for ; Wed, 17 May 2017 22:21:11 +0000 (UTC) Received: by mail-oi0-f42.google.com with SMTP id h4so33227576oib.3 for ; Wed, 17 May 2017 15:21:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=BwGIH0mQuDV7STdqIXp4hoZTP+rei/IkUYNsPV1HokM=; b=So/2XMGbf52co4DYT/cXEMaEsIjGT47ks2Rf/6H6RAY786xbnVyPkuqEVS63orfrv6 kMyKwnMi4ABEfZeqedNXNcMCsi3UfOWHK6U/zdMCt3QnuYywn/kvLa+JxU0PkzwjbWpG daKZ6JtH1PN+CGFz3FWn0NQH1MoMZtPoz1b22JXI+Pup0vxIBq+hzYdhLKz6ah/+5S+K BbcOPIPwrWe6Ohz9ZYNEQVyhgsdD75bePH0ZpY8A67uP/v/EVDmiS8c9orZNzJopy8Gs vgjts+0x0vsc4v8MFs78A7+u5a6PGaXlpgv5Mu88tzYn2SM2GpDi9ovgT6WOzHGObtjZ jvHQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=BwGIH0mQuDV7STdqIXp4hoZTP+rei/IkUYNsPV1HokM=; b=R6c5FZZshtPlgxYgzoJuhxmbBNe0FSvRWZk4QByfhBI8i99mi6BeL07CkaYt6Dynzw r+8SC4aTQLUG6tXIOozHgRIaKU4pUpYU/AXu1aawXxMtQFFEsqoWVR3nK3b4fcd1lAxO 8i1xfuzsXdtM9SO2LUrVWpDegbCvsGhfm7/nyaWiOpm7z9A/4bnkq4eR7yXnvsePfS4r pIyowrMIhk/+r33JrzsrsgFxlxSwgfQFVfmh8yZ9ndxnvzNwFN4/iSL3qvg7mbTYfB6p 1sTK3y7LXeKKga6IDBPX+HLNRjvR+8JGEAi1xsJ0tH61vBk7k288+PLzNiPaayk3HFmv F3/w== X-Gm-Message-State: AODbwcB65zaq043O2W1Pr+8v5cYH6/ZrL8O3ejMxLYak7F8w5QXTwEhr CE4ToK0XDBycKXUPe8RhjPYbJrscaQ== X-Received: by 10.202.199.10 with SMTP id x10mr609051oif.48.1495059671484; Wed, 17 May 2017 15:21:11 -0700 (PDT) MIME-Version: 1.0 Received: by 10.202.172.84 with HTTP; Wed, 17 May 2017 15:20:51 -0700 (PDT) In-Reply-To: References: From: Jia Teoh Date: Wed, 17 May 2017 15:20:51 -0700 Message-ID: Subject: Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming To: "Tzu-Li (Gordon) Tai" Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="001a1134f8b0a30e0b054fbfb4e3" archived-at: Wed, 17 May 2017 22:21:17 -0000 --001a1134f8b0a30e0b054fbfb4e3 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable I'll take a look into the ProcessFunction, thanks for the suggestion. -Jia On Wed, May 17, 2017 at 12:33 AM, Tzu-Li (Gordon) Tai wrote: > Hi Jia, > > Actually just realized you can access the timestamp of records via the > more powerful `ProcessFunction` [1]. > That=E2=80=99ll be a bit easier to use than implementing your own custom = operator, > which is quite low-level. > > Cheers, > Gordon > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/stream/process_function.html > > On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiateoh@gmail.com) wrote: > > Hi Gordon, > > The timestamps are required for application logic. Thank you for > clarifying the custom operators - seems I mistakenly thought of the > functions that are passed to the operators rather than the operators > themselves. AbstractStreamOperator and the other classes you mentioned se= em > like exactly what I'm looking for, so I will take a look into implementin= g > a custom one for my use case. > > Thank you, > Jia > > On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai > wrote: > >> Hi Jia, >> >> How exactly do you want to use the Kafka timestamps? Do you want to >> access them and alter them with new values as the record timestamp? Or d= o >> you want to use them for some application logic in your functions? >> >> If its the former, you should be able to do that by using timestamp / >> watermark extractors. They come with an interface that exposes the curre= nt >> timestamp of the record. For Kafka 0.10, that timestamp would be the Kaf= ka >> record=E2=80=99s timestamp if it hasn=E2=80=99t been explicitly assigned= any other >> timestamp yet. >> >> If its the latter, then I think currently you have to use custom >> operators as Robert mentioned. >> Custom operators are classes that extend the `AbstractStreamOperator` >> base class as one of `OneInputStreamOperator` or `TwoInputStreamOperator= ` >> interfaces. >> You can take a look at the basic `StreamMap` or `StreamFlatMap` classes >> for an example, which are the underlying operators for the map and flatM= ap >> functions. >> >> At the operator level, you=E2=80=99ll have access to a `StreamRecord` in= the >> `processElement` function which wraps the record value (which you get wh= en >> implementing functions) as well as the internal timestamp that comes wit= h >> the record. >> >> Cheers, >> Gordon >> >> >> On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiateoh@gmail.com) wrote: >> >> Hi Robert, >> >> Thanks for the reply. I ended up implementing an extension of the Kafka >> fetcher and consumer so that the deserialization API can include the >> timestamp field, which is sufficient for my specific use case. I can sha= re >> the code if desired but it seems like it's an intentional design decisio= n >> not to expose the timestamp in the deserialization API. >> >> I noticed you mentioned that I could use a custom operator to access the >> record event time. Could you elaborate on what you mean by "operator"? I >> initially thought that referred to DataStream.map/reduce/etc, but none o= f >> those functions provide arguments that can be used to extract the embedd= ed >> timestamp. >> >> Thanks, >> Jia Teoh >> >> On Fri, May 12, 2017 at 9:25 AM, Robert Metzger >> wrote: >> >>> Hi Jia, >>> >>> The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but >>> it is extensible / pluggable so that also the Kafka 0.9 Fetcher can rea= d >>> the event timestamps from Kafka 10. >>> We don't expose the timestamp through the deserilaization API, because >>> we set it internally in Flink. (there is a "hidden" field with each rec= ord >>> containing the event time of the event) >>> >>> With a custom operator you can access the event time of a record. >>> >>> On Fri, May 12, 2017 at 3:26 AM, Jia Teoh wrote: >>> >>>> Hi, >>>> >>>> Is there a way to retrieve the timestamps that Kafka associates with >>>> each key-value pair within Flink? I would like to be able to use these= as >>>> values within my application flow, and defining them before or after K= afka >>>> is not acceptable for the use case due to the latency involved in send= ing >>>> or receiving from Kafka. >>>> >>>> It seems that Flink supports Kafka event time (link >>>> ) >>>> but after a brief trace it seems that KafkaConsumer010 still relies on= the >>>> Kafka09Fetcher >>>> for >>>> iterating through each Kafka record and deserializing it. The >>>> KeyedDeserializationSchema api does not seem to have support for inclu= ding >>>> timestamp as additional metadata (just offset, topic, and partition) s= o >>>> something such as JSONKeyValueDeserializationSchema will not return >>>> the Kafka-specified timestamp. >>>> >>>> For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + >>>> Kafka Connector (1.2.1). >>>> >>>> Thanks, >>>> Jia Teoh >>>> >>> >>> >> > --001a1134f8b0a30e0b054fbfb4e3 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I'll take a look into the ProcessFunction, thanks for = the suggestion.

-Jia

On Wed, May 17, 2017 at 12:33 AM, Tzu-Li = (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Jia,
Actually just realized you can access the timestamp of records v= ia the more powerful `ProcessFunction` [1].
That=E2=80=99ll be a bi= t easier to use than implementing your own custom operator, which is quite = low-level.

Cheers,
Gordon

[1]=C2=A0https://ci.apache= .org/projects/flink/flink-docs-release-1.2/dev/stream/proces= s_function.html

On 17 May 2017= at 1:45:38 PM, Jia Teoh (jiateoh@gmail.com) wrote:

Hi Gordon,

The timestamps are required for application logic. Thank you for clarifying the custom operators - seems I mistakenly thought of the functions that are passed to the operators rather than the operators themselves. AbstractStreamOperator and the other classes you mentioned seem like exactly what I'm looking for, so I will take a look into implementing a custom one for my use case.

Thank you,
Jia

On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Jia,

How exactly do you want to use the Kafka timestamps? Do you want to access them and alter them with new values as the record timestamp? Or do you want to use them for some application logic in your functions?

If its the former, you should be able to do that by using timestamp / watermark extractors. They come with an interface that exposes the current timestamp of the record. For Kafka 0.10, that timestamp would be the Kafka record=E2=80=99s timestamp if it hasn=E2=80=99t been exp= licitly assigned any other timestamp yet.

If its the latter, then I think currently you have to use custom operators as Robert mentioned.
Custom operators are classes that extend the `AbstractStreamOperator` base class as one of `OneInputStreamOperator` or `TwoInputStreamOperator` interfaces.
You can take a look at the basic `StreamMap` or `StreamFlatMap` classes for an example, which are the underlying operators for the map and flatMap functions.

At the operator level, you=E2=80=99ll have access to a `StreamRecord` in the `processElement` function which wraps the record value (which you get when implementing functions) as well as the internal timestamp that comes with the record.

Cheers,
Gordon


On 17 May 2= 017 at 4:27:36 AM, Jia Teoh (jiateo= h@gmail.com) wrote:

Hi Robert,

Thanks for the reply. I ended up implementing an extension of the Kafka fetcher and consumer so that the deserialization API can include the timestamp field, which is sufficient for my specific use case. I can share the code if desired but it seems like it's an intentional design decision not to expose the timestamp in the deserialization API.

I noticed you mentioned that I could use a custom operator to access the record event time. Could you elaborate on what you mean by "operator"? I initially thought that referred to DataStream.map/reduce/etc, but none of those functions provide arguments that can be used to extract the embedded timestamp.=C2=A0

Thanks,
Jia Teoh

On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetzger@apache.org> wrote:
Hi Jia,

The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read the event timestamps from Kafka 10.
We don't expose the timestamp through the deserilaization API, because we set it internally in Flink. (there is a "hidden" field with each record containing the event time of the event)

With a custom operator you can access the event time of a record.

On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiateoh@gmail.com> wrote:
Hi,

Is there a way to retrieve the timestamps that Kafka associates with each key-value pair within Flink? I would like to be able to use these as values within my application flow, and defining them before or after Kafka is not acceptable for the use case due to the latency involved in sending or receiving from Kafka.

It seems that Flink supports Kafka event time (= link) but after a brief trace it seems that KafkaConsumer010 still relies on the=C2=A0Kafka09Fetcher=C2=A0for iterating through eac= h Kafka record and deserializing it. The KeyedDeserializationSchema api does not seem to have support for including timestamp as additional metadata (just offset, topic, and partition) so something such as JSONKeyValueDeserializationSchema will not return the Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka Connector (1.2.1).

Thanks,
Jia Teoh




--001a1134f8b0a30e0b054fbfb4e3--