flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [5/7] flink git commit: [FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks
Date Fri, 28 Jul 2017 13:53:35 GMT
[FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks

This closes #4187.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ded2d86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ded2d86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ded2d86

Branch: refs/heads/master
Commit: 1ded2d867d9c30f01395494adec3cbaa629bbb5a
Parents: 8828648
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Fri Jul 28 18:59:11 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 .../kafka/internal/Kafka010FetcherTest.java     | 17 +++++----
 .../kafka/internals/Kafka08Fetcher.java         | 15 ++++----
 .../kafka/internal/Kafka09Fetcher.java          |  7 +++-
 .../kafka/internal/KafkaConsumerThread.java     | 39 +++++++-------------
 .../kafka/internal/Kafka09FetcherTest.java      | 17 +++++----
 .../kafka/internal/KafkaConsumerThreadTest.java |  7 ++--
 .../kafka/internals/AbstractFetcher.java        |  8 +++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 12 +++---
 .../kafka/internals/AbstractFetcherTest.java    |  5 ++-
 9 files changed, 66 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 22e183d..f895e2f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -157,7 +158,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -285,7 +286,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -302,7 +303,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -338,9 +339,9 @@ public class Kafka010FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data
= new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -446,11 +447,11 @@ public class Kafka010FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 6e2a1d4..7359e91 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -346,21 +348,20 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>
{
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback
commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
-				if (commitCallback != null) {
-					commitCallback.onSuccess();
-				}
+				commitCallback.onSuccess();
 			}
 			catch (Exception e) {
 				if (running) {
-					if (commitCallback != null) {
-						commitCallback.onException(e);
-					}
+					commitCallback.onException(e);
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index db7d63b..cef70fe 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -210,7 +212,10 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition>
{
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback
commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		@SuppressWarnings("unchecked")
 		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 6ff82d7..fc5f359 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -97,8 +99,8 @@ public class KafkaConsumerThread extends Thread {
 	private boolean hasAssignedPartitions;
 
 	/**
-	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map)} or {@link
#shutdown()})
-	 * had attempted to wakeup the consumer while it was isolated for partition reassignment.
+	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map, KafkaCommitCallback)}
+	 * or {@link #shutdown()}) had attempted to wakeup the consumer while it was isolated for
partition reassignment.
 	 */
 	private volatile boolean hasBufferedWakeup;
 
@@ -109,7 +111,8 @@ public class KafkaConsumerThread extends Thread {
 	private volatile boolean commitInProgress;
 
 	/** User callback to be invoked when commits completed. */
-	private volatile KafkaCommitCallback callerCommitCallback;
+	private volatile KafkaCommitCallback offsetCommitCallback;
+
 	public KafkaConsumerThread(
 			Logger log,
 			Handover handover,
@@ -314,30 +317,19 @@ public class KafkaConsumerThread extends Thread {
 	 * superseded by newer ones.
 	 *
 	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when Kafka commit completes
 	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit)
{
-		setOffsetsToCommit(offsetsToCommit, null);
-	}
+	void setOffsetsToCommit(
+			Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+			@Nonnull KafkaCommitCallback commitCallback) {
 
-	/**
-	 * Tells this thread to commit a set of offsets. This method does not block, the committing
-	 * operation will happen asynchronously.
-	 *
-	 * <p>Only one commit operation may be pending at any time. If the committing takes
longer than
-	 * the frequency with which this method is called, then some commits may be skipped due
to being
-	 * superseded by newer ones.
-	 *
-	 * @param offsetsToCommit The offsets to commit
-	 * @param commitCallback callback when kafka commit completes
-	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer
notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available.
" +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
-		this.callerCommitCallback = commitCallback;
+		this.offsetCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -501,12 +493,9 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.",
ex);
-				if (callerCommitCallback != null) {
-					callerCommitCallback.onException(ex);
-				}
-			}
-			else if (callerCommitCallback != null) {
-				callerCommitCallback.onSuccess();
+				offsetCommitCallback.onException(ex);
+			} else {
+				offsetCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index f9ec204..33ec17e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -157,7 +158,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -284,7 +285,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -301,7 +302,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -337,9 +338,9 @@ public class Kafka09FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data
= new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -445,11 +446,11 @@ public class Kafka09FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 3a2f84a..2368091 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -492,7 +493,7 @@ public class KafkaConsumerThreadTest {
 		// pause just before the reassignment so we can inject the wakeup
 		testThread.waitPartitionReassignmentInvoked();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 		verify(mockConsumer, times(1)).wakeup();
 
 		testThread.startPartitionReassignment();
@@ -578,7 +579,7 @@ public class KafkaConsumerThreadTest {
 		// pause just before the reassignment so we can inject the wakeup
 		testThread.waitPartitionReassignmentInvoked();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 
 		// make sure the consumer was actually woken up
 		verify(mockConsumer, times(1)).wakeup();
@@ -664,7 +665,7 @@ public class KafkaConsumerThreadTest {
 		// wait until the reassignment has started
 		midAssignmentLatch.await();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 
 		// the wakeup in the setOffsetsToCommit() call should have been buffered, and not called
on the consumer
 		verify(mockConsumer, never()).wakeup();

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 7314e14..11f97b2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -237,10 +239,12 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets
by 1 before committing).
-	 * @param commitCallback The callback that the user can implement to trigger custom actions
when a commit request completes.
+	 * @param commitCallback The callback that the user should trigger when a commit request
completes or fails.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
KafkaCommitCallback commitCallback) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception;
 
 	/**
 	 * Creates the Kafka version specific representation of the given

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 34158fd..59ce666 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -469,7 +469,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -486,10 +486,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -504,15 +504,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class));
// no offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 8699247..1063102 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.HashMap;
@@ -361,7 +362,9 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
KafkaCommitCallback callback) throws Exception {
+		public void commitInternalOffsetsToKafka(
+				Map<KafkaTopicPartition, Long> offsets,
+				@Nonnull KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


Mime
View raw message