flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
Date Thu, 23 Aug 2018 08:27:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589904#comment-16589904
] 

ASF GitHub Bot commented on FLINK-8500:
---------------------------------------

aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the
Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212222385
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##########
 @@ -45,6 +45,22 @@
 	 */
 	T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException;
 
+	/**
+	 * Deserializes the byte message.
+	 *
+	 * @param messageKey the key as a byte array (null if no key has been set).
+	 * @param message The message, as a byte array (null if the message was empty or deleted).
+	 * @param partition The partition the message has originated from.
+	 * @param offset the offset of the message in the original source (for example the Kafka
offset).
+	 * @param timestamp the timestamp of the consumer record
+	 * @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME.
+	 *
+	 * @return The deserialized message as an object (null if the message cannot be deserialized).
+	 */
+	default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long
offset, long timestamp, TimestampType timestampType) throws IOException {
 
 Review comment:
   Yes, to unblock this I thing we can go with this approach, basically the schema becomes
this:
   ```
   @PublicEvolving
   public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
{
   
           @Deprecated
   	default T deserialize(byte[] messageKey, byte[] message, String topic, int partition,
long offset) {
               throw new RuntimeException("blammo");
           }
   
           default T deserialize(byte[] messageKey, byte[] message, String topic, int partition,
long offset, long timestamp) {
               return deserialize(/* call the other method */);
           } 
   	boolean isEndOfStream(T nextElement);
   }
   ```
   
   With this, if you have an existing implementation of `KeyedDeserializationSchema` it will
continue to work without any changes. If you implement a new one you have to implement one
of the methods, otherwise the exception is thrown. And all Flink code only calls the version
that takes the timestamp.
   
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka message
timestamp' (from ConsumerRecord) .In some business scenarios, this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message