flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jia Teoh <jiat...@gmail.com>
Subject Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming
Date Tue, 16 May 2017 20:27:11 GMT
Hi Robert,

Thanks for the reply. I ended up implementing an extension of the Kafka
fetcher and consumer so that the deserialization API can include the
timestamp field, which is sufficient for my specific use case. I can share
the code if desired but it seems like it's an intentional design decision
not to expose the timestamp in the deserialization API.

I noticed you mentioned that I could use a custom operator to access the
record event time. Could you elaborate on what you mean by "operator"? I
initially thought that referred to DataStream.map/reduce/etc, but none of
those functions provide arguments that can be used to extract the embedded

Jia Teoh

On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetzger@apache.org> wrote:

> Hi Jia,
> The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it
> is extensible / pluggable so that also the Kafka 0.9 Fetcher can read the
> event timestamps from Kafka 10.
> We don't expose the timestamp through the deserilaization API, because we
> set it internally in Flink. (there is a "hidden" field with each record
> containing the event time of the event)
> With a custom operator you can access the event time of a record.
> On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiateoh@gmail.com> wrote:
>> Hi,
>> Is there a way to retrieve the timestamps that Kafka associates with each
>> key-value pair within Flink? I would like to be able to use these as values
>> within my application flow, and defining them before or after Kafka is not
>> acceptable for the use case due to the latency involved in sending or
>> receiving from Kafka.
>> It seems that Flink supports Kafka event time (link
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>)
>> but after a brief trace it seems that KafkaConsumer010 still relies on the
>> Kafka09Fetcher
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L137>
>> iterating through each Kafka record and deserializing it. The
>> KeyedDeserializationSchema api does not seem to have support for including
>> timestamp as additional metadata (just offset, topic, and partition) so
>> something such as JSONKeyValueDeserializationSchema will not return the
>> Kafka-specified timestamp.
>> For reference, I am using Kafka 0.10.2 and the Flink-Streaming API +
>> Kafka Connector (1.2.1).
>> Thanks,
>> Jia Teoh

View raw message