flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [hotfix] [kafka] Fix NPE in Kafka09Fetcher
Date Tue, 04 Oct 2016 13:46:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 72e6b760f -> eece0dd05


[hotfix] [kafka] Fix NPE in Kafka09Fetcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eece0dd0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eece0dd0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eece0dd0

Branch: refs/heads/master
Commit: eece0dd05bc38b88fcb6cbcef15add7f98eab456
Parents: 72e6b76
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Oct 4 14:43:35 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Oct 4 15:30:32 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/internal/Kafka09Fetcher.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eece0dd0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 3c2cca3..ad7efa2 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -291,9 +291,9 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition>
implem
 		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
 			// committed offsets through the KafkaConsumer need to be 1 more than the last processed
offset.
 			// This does not affect Flink's checkpoints/saved state.
-			Long offsetToCommit = offsets.get(partition.getKafkaTopicPartition()) + 1;
+			Long offsetToCommit = offsets.get(partition.getKafkaTopicPartition());
 			if (offsetToCommit != null) {
-				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
+				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit
+ 1));
 				partition.setCommittedOffset(offsetToCommit);
 			}
 		}


Mime
View raw message