flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
Date Mon, 07 May 2018 06:49:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465499#comment-16465499
] 

ASF GitHub Bot commented on FLINK-8500:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186338721
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
{
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
    +
     	/**
     	 * Deserializes the byte message.
     	 *
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 *
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
     	 */
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException
{
    --- End diff --
    
    I'm actually not sure that we should continue using this class, for the following reasons:
    
    1. The class is actually placed under a non-ideal package:
    `o.a.f.api.common.serialization`, whereas is should be placed under some `o.a.f.connectors.kafka....`.
    The reason it is currently placed under this package was because the `DeserializationSchema`
was initially intended to be commonly used by all connectors. However, over time, things have
proven that each connector will benefit from their own version of a schema class.
    
    So, it actually might make sense to deprecate the whole `DeserializationSchema` class
now, and have a new class (maybe called `KafkaDeserializationSchema` / `KafkaSerializationSchema`)
under a correct Kafka package.
    
    What do you think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka message
timestamp' (from ConsumerRecord) .In some business scenarios, this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message