flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [hotfix][Kafka 0.9] Avoid committing offsets to closed consumer
Date Mon, 21 Mar 2016 13:38:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 607892314 -> 35892ed14


[hotfix][Kafka 0.9] Avoid committing offsets to closed consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35892ed1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35892ed1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35892ed1

Branch: refs/heads/master
Commit: 35892ed148afdb217a61fbacea1a9cb0eacb5c48
Parents: 6078923
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Mar 21 12:31:17 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Mar 21 14:38:05 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java   | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35892ed1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 62ba3c4..3b780bd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -387,6 +387,10 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 
 	@Override
 	protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets)
{
+		if(!running) {
+			LOG.warn("Unable to commit offsets on closed consumer");
+			return;
+		}
 		Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
 		synchronized (this.consumer) {
 			this.consumer.commitSync(kafkaCheckpointOffsets);


Mime
View raw message