flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming
Date Wed, 17 May 2017 04:52:40 GMT
Hi Jia,

How exactly do you want to use the Kafka timestamps? Do you want to access them and alter
them with new values as the record timestamp? Or do you want to use them for some application
logic in your functions?

If its the former, you should be able to do that by using timestamp / watermark extractors.
They come with an interface that exposes the current timestamp of the record. For Kafka 0.10,
that timestamp would be the Kafka record’s timestamp if it hasn’t been explicitly assigned
any other timestamp yet.

If its the latter, then I think currently you have to use custom operators as Robert mentioned.
Custom operators are classes that extend the `AbstractStreamOperator` base class as one of
`OneInputStreamOperator` or `TwoInputStreamOperator` interfaces.
You can take a look at the basic `StreamMap` or `StreamFlatMap` classes for an example, which
are the underlying operators for the map and flatMap functions.

At the operator level, you’ll have access to a `StreamRecord` in the `processElement` function
which wraps the record value (which you get when implementing functions) as well as the internal
timestamp that comes with the record.


On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiateoh@gmail.com) wrote:

Hi Robert,

Thanks for the reply. I ended up implementing an extension of the Kafka fetcher and consumer
so that the deserialization API can include the timestamp field, which is sufficient for my
specific use case. I can share the code if desired but it seems like it's an intentional design
decision not to expose the timestamp in the deserialization API.

I noticed you mentioned that I could use a custom operator to access the record event time.
Could you elaborate on what you mean by "operator"? I initially thought that referred to DataStream.map/reduce/etc,
but none of those functions provide arguments that can be used to extract the embedded timestamp. 

Jia Teoh

On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetzger@apache.org> wrote:
Hi Jia,

The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is extensible /
pluggable so that also the Kafka 0.9 Fetcher can read the event timestamps from Kafka 10.
We don't expose the timestamp through the deserilaization API, because we set it internally
in Flink. (there is a "hidden" field with each record containing the event time of the event)

With a custom operator you can access the event time of a record.

On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiateoh@gmail.com> wrote:

Is there a way to retrieve the timestamps that Kafka associates with each key-value pair within
Flink? I would like to be able to use these as values within my application flow, and defining
them before or after Kafka is not acceptable for the use case due to the latency involved
in sending or receiving from Kafka.

It seems that Flink supports Kafka event time (link) but after a brief trace it seems that
KafkaConsumer010 still relies on the Kafka09Fetcher for iterating through each Kafka record
and deserializing it. The KeyedDeserializationSchema api does not seem to have support for
including timestamp as additional metadata (just offset, topic, and partition) so something
such as JSONKeyValueDeserializationSchema will not return the Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka Connector (1.2.1).

Jia Teoh

View raw message