flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
Date Mon, 07 May 2018 06:49:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465499#comment-16465499

ASF GitHub Bot commented on FLINK-8500:

Github user tzulitai commented on a diff in the pull request:

    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
    @@ -42,14 +42,22 @@
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
     	 * Deserializes the byte message.
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException
    --- End diff --
    I'm actually not sure that we should continue using this class, for the following reasons:
    1. The class is actually placed under a non-ideal package:
    `o.a.f.api.common.serialization`, whereas is should be placed under some `o.a.f.connectors.kafka....`.
    The reason it is currently placed under this package was because the `DeserializationSchema`
was initially intended to be commonly used by all connectors. However, over time, things have
proven that each connector will benefit from their own version of a schema class.
    So, it actually might make sense to deprecate the whole `DeserializationSchema` class
now, and have a new class (maybe called `KafkaDeserializationSchema` / `KafkaSerializationSchema`)
under a correct Kafka package.
    What do you think?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>         Attachments: image-2018-01-30-14-58-58-167.png, image-2018-01-31-10-48-59-633.png
> The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka message
timestamp' (from ConsumerRecord) .In some business scenarios, this is useful!

This message was sent by Atlassian JIRA

View raw message