flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2974) Add periodic offset commit to Kafka Consumer if checkpointing is disabled
Date Wed, 11 Nov 2015 13:21:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15000350#comment-15000350
] 

ASF GitHub Bot commented on FLINK-2974:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530755
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception
{
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions)
{
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed",
offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						// looks like the thread is being closed. Leave loop
    +						break;
    +					}
    +				}
    +			} catch(Throwable t) {
    +				LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error",
t);
    +				consumer.fetcher.stopWithError(t);
    +			}
    +		}
    +
    +		public void close() {
    +			running = false;
    +			// interrupt sleep
    +			Thread.currentThread().interrupt();
    --- End diff --
    
    This interrupts the wrong thread, namely the one that calls close() rather then the offset
committing thread.


> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-2974
>                 URL: https://issues.apache.org/jira/browse/FLINK-2974
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user: http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message