flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
Date Thu, 11 Jan 2018 12:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322108#comment-16322108
] 

ASF GitHub Bot commented on FLINK-6352:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5282

    [FLINK-6352] [kafka] Timestamp-based offset configuration for FlinkKafkaConsumer

    ## What is the purpose of the change
    
    This PR is based on @zjureel's initial efforts on the feature in #3915.
    
    This version mainly differs in that:
    - When using timestamps to define the offset, the actual offset is eagerly determined
in the `FlinkKafkaConsumerBase` class.
    - The `setStartFromTimestamp` configuration method is defined in the `FlinkKafkaConsumerBase`
class, with `protected` access. Kafka versions which support the functionality should override
the method with `public` access.
    - Timestamp is configured simply as a long value, and not a Java `Date`.
    
    **Overall, the usage of the feature is as follows:**
    ```
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(...);
    consumer.setStartFromTimestamp(1515671654453L);
    
    DataStream<String> stream = env.addSource(consumer);
    ...
    ```
    
    Only versions 0.10 and 0.11 supports this feature.
    
    **Semantics:**
    - The provided timestamp cannot be larger than the current timestamp.
    - For a partition, the earliest record which `record timestamp >= provided timestamp`
is used as the starting offset.
    - If the provided timestamp is larger than the latest record in a partition, that partition
will simply be read from the head.
    - For all new partitions that are discovered after the initial startup (due to scaling
out Kafka), they are all read from the earliest possible record and the provided timestamp
is not used.
    
    ## Brief change log
    
    - d012826 @zjureel's initial efforts on the feature.
    - 7ac07e8 Instead of lazily determining exact offsets for timestamp-based startup, the
offsets are determined eagerly in `FlinkKafkaConsumerBase`. This commit also refactors the
`setStartFromTimestamp` method to live in the base class.
    - 32d46ef Change to just use long values to define timestamps, instead of using Java `Date`
    - 7bb44a8 General improvement for the `runStartFromTimestamp` integration test.
    
    ## Verifying this change
    
    New integration tests `Kafka010ITCase::testStartFromTimestamp` and `Kafka011ITCase::testStartFromTimestamp`
verifies this new feature.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes
/ **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** /
not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5282
    
----
commit d012826480b7eee2641da3b260b127bf8efaf790
Author: zjureel <zjureel@...>
Date:   2017-12-21T09:49:11Z

    [FLINK-6352] [kafka] Support to set offset of Kafka with specific date

commit 7ac07e8824ec42aeef6ee6b1d00650acf8ae06bb
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-11T06:26:37Z

    [FLINK-6352] [kafka] Eagerly determine startup offsets when startup mode is TIMESTAMP

commit 32d46ef2b98b282ca12e170702161bc123bc1f56
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-11T09:33:49Z

    [FLINK-6352] [kafka] Remove usage of java Date to specify startup timestamp

commit 7bb44a8d510612bff4b5137ff54f023ed556489a
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-11T10:33:21Z

    [FLINK-6352] [kafka, tests] Make runStartFromTimestamp more flexible

----


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -------------------------------------------------------------------------
>
>                 Key: FLINK-6352
>                 URL: https://issues.apache.org/jira/browse/FLINK-6352
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Fang Yong
>            Assignee: Fang Yong
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
>     Currently "auto.offset.reset" is used to initialize the start offset of FlinkKafkaConsumer,
and the value should be earliest/latest/none. This method can only let the job comsume the
beginning or the most recent data, but can not specify the specific offset of Kafka began
to consume. 
>     So, there should be a configuration item (such as "flink.source.start.time" and the
format is "yyyy-MM-dd HH:mm:ss") that allows user to configure the initial offset of Kafka.
The action of "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  "flink.source.start.time"
will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this partition
is newly increased), the "flink.kafka.start.time" will be used to initialize the offset of
the partition    
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used to initialize
the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does currently
with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message