flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
Date Sat, 28 Apr 2018 13:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457587#comment-16457587

ASF GitHub Bot commented on FLINK-8500:

GitHub user FredTing opened a pull request:


    [FLINK-8500] [Kafka Connector] Get the timestamp of the Kafka message from kafka consumer

    ## What is the purpose of the change
    This pull request make the Kafka timestamp and timestampType available in the message
deserialisation so one can use it in the business logic processing.
    ## Brief change log
    - Introduced new interface `KeyedWithTimestampDeserializationSchema` with extra parameters
in the `deserialize` method.
    - Added the `KeyedWithTimestampDeserializationSchemaWrapper` class to keep the code backwards
    - Adjusted the Kafka Connectors 0.10+ to support the new interface too.
    - Adjusted the Kafka Connectors 0.9- to 'hide' this new interface since these version
of Kafka don';t support timestamps
    - Added some documentation.  
    ## Verifying this change
    This change is already covered by existing tests, such as most of the the Kafka Consumer
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): yes
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented?  docs / JavaDocs 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5939
commit 30293ac49a1d31c2abfa2b3fb3640e9e04ef8bcf
Author: Fred Teunissen <fred.teunissen@...>
Date:   2018-04-28T08:06:41Z

    [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>         Attachments: image-2018-01-30-14-58-58-167.png, image-2018-01-31-10-48-59-633.png
> The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka message
timestamp' (from ConsumerRecord) .In some business scenarios, this is useful!

This message was sent by Atlassian JIRA

View raw message