flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4618) Last kafka message gets consumed twice when restarting job
Date Wed, 14 Sep 2016 09:15:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489901#comment-15489901
] 

Tzu-Li (Gordon) Tai commented on FLINK-4618:
--------------------------------------------

Hi [~melmoth],
Flink achieves exactly-once guarantees by working with offsets that are checkpointed internally
in Flink, not the offsets that are committed back to ZK / Kafka. This offset committing is
either done periodically or on checkpointing, depending on the consumer configuration. So,
this committed offset may not always reflect the actual progress of the consumption.
However, on a "fresh" startup of a job ("fresh" startup meaning that the execution of the
job is not an automatic restore from previous failure), the Kafka consumer respect any existing
offsets committed in ZK as starting points.

So, if I am correct, what is actually happening is that, on your second execution of the job,
the Kafka consumer is simply just starting from the offsets it finds in ZK.

If you want exactly-once for manual job restarts, you would use Flink savepoints. See https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html
for more detail.
Otherwise, the exactly-once guarantee refers to job automatic restores across job failures.

> Last kafka message gets consumed twice when restarting job
> ----------------------------------------------------------
>
>                 Key: FLINK-4618
>                 URL: https://issues.apache.org/jira/browse/FLINK-4618
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.2
>         Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>            Reporter: Melmoth
>
> There seem to be an issue with the offset management in Flink. When a job is stopped
and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer
group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 4848911, which
is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed
more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Fetching committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 2147482646
> 10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
No committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting
offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetched
offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding
fetched record for partition myTopic-0 with offset 4848910 to buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning
fetched records at offset 4848910 for assigned partition myTopic-0 and update position to
4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}}
to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Committed offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped
BLOB server at 0.0.0.0:2946
> -- Restarting job
> 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 2147482646 at hdp1:6667.
> 10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Fetching committed offsets for partitions: [myTopic-0]
> 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 2147482646
> // See below! Shouldn't the offset be 4848911?
> 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting
offset for partition myTopic-0 to the committed offset 4848910
> 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848910
> 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 1001 at hdp1:6667.
> 10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 1001
> // Here record 4848910 gets consumed again!
> 10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding
fetched record for partition myTopic-0 with offset 4848910 to buffered record list
> 10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning
fetched records at offset 4848910 for assigned partition myTopic-0 and update position to
4848911
> 10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added
fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,894 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting
down BlobCache
> 10:32:04,903 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped
BLOB server at 0.0.0.0:3079
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message