flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [10/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kafka*
Date Sun, 28 May 2017 06:17:40 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 1c87542..82294d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +43,7 @@ import java.util.Properties;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
- * 
+ *
  * @param <T> The type of elements produced by the fetcher.
  */
 public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
@@ -53,16 +52,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 	// ------------------------------------------------------------------------
 
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
+	/** The schema to convert between Kafka's byte messages, and Flink's objects. */
 	private final KeyedDeserializationSchema<T> deserializer;
 
-	/** The handover of data and exceptions between the consumer thread and the task thread */
+	/** The handover of data and exceptions between the consumer thread and the task thread. */
 	private final Handover handover;
 
-	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
+	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
 	private final KafkaConsumerThread consumerThread;
 
-	/** Flag to mark the main work loop as alive */
+	/** Flag to mark the main work loop as alive. */
 	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
@@ -80,8 +79,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			boolean useMetrics) throws Exception
-	{
+			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
 				assignedPartitionsWithInitialOffsets,
@@ -97,7 +95,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
 		addOffsetStateGauge(kafkaMetricGroup);
-		
+
 		this.consumerThread = new KafkaConsumerThread(
 				LOG,
 				handover,

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
index 37ba34c..c0b9441 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -25,11 +25,11 @@ import java.util.List;
 
 /**
  * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
- * 
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ *
+ * <p>This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
  * for example changing {@code assign(List)} to {@code assign(Collection)}.
- * 
- * Because of that, we need to have two versions whose compiled code goes against different method signatures.
+ *
+ * <p>Because of that, we need to have two versions whose compiled code goes against different method signatures.
  * Even though the source of subclasses may look identical, the byte code will be different, because they
  * are compiled against different dependencies.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index cbe1551..0c5482a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -30,7 +31,6 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -45,54 +45,53 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
  * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
  * deserialize and emit the records.
- * 
+ *
  * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
  * The Kafka consumer code was found to not always handle interrupts well, and to even
  * deadlock in certain situations.
- * 
+ *
  * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
  * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
  * to the KafkaConsumer calls that change signature.
  */
 public class KafkaConsumerThread extends Thread {
 
-	/** Logger for this consumer */
+	/** Logger for this consumer. */
 	private final Logger log;
 
-	/** The handover of data and exceptions between the consumer thread and the task thread */
+	/** The handover of data and exceptions between the consumer thread and the task thread. */
 	private final Handover handover;
 
-	/** The next offsets that the main thread should commit */
+	/** The next offsets that the main thread should commit. */
 	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
 
-	/** The configuration for the Kafka consumer */
+	/** The configuration for the Kafka consumer. */
 	private final Properties kafkaProperties;
 
-	/** The partitions that this consumer reads from */ 
+	/** The partitions that this consumer reads from. */
 	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates;
 
 	/** We get this from the outside to publish metrics. **/
 	private final MetricGroup kafkaMetricGroup;
 
-	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
+	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */
 	private final KafkaConsumerCallBridge consumerCallBridge;
 
-	/** The maximum number of milliseconds to wait for a fetch batch */
+	/** The maximum number of milliseconds to wait for a fetch batch. */
 	private final long pollTimeout;
 
-	/** Flag whether to add Kafka's metrics to the Flink metrics */
+	/** Flag whether to add Kafka's metrics to the Flink metrics. */
 	private final boolean useMetrics;
 
-	/** Reference to the Kafka consumer, once it is created */
+	/** Reference to the Kafka consumer, once it is created. */
 	private volatile KafkaConsumer<byte[], byte[]> consumer;
 
-	/** Flag to mark the main work loop as alive */
+	/** Flag to mark the main work loop as alive. */
 	private volatile boolean running;
 
-	/** Flag tracking whether the latest commit request has completed */
+	/** Flag tracking whether the latest commit request has completed. */
 	private volatile boolean commitInProgress;
 
-
 	public KafkaConsumerThread(
 			Logger log,
 			Handover handover,
@@ -271,7 +270,7 @@ public class KafkaConsumerThread extends Thread {
 		// this wakes up the consumer if it is blocked handing over records
 		handover.wakeupProducer();
 
-		// this wakes up the consumer if it is blocked in a kafka poll 
+		// this wakes up the consumer if it is blocked in a kafka poll
 		if (consumer != null) {
 			consumer.wakeup();
 		}
@@ -280,11 +279,11 @@ public class KafkaConsumerThread extends Thread {
 	/**
 	 * Tells this thread to commit a set of offsets. This method does not block, the committing
 	 * operation will happen asynchronously.
-	 * 
+	 *
 	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
 	 * the frequency with which this method is called, then some commits may be skipped due to being
 	 * superseded  by newer ones.
-	 * 
+	 *
 	 * @param offsetsToCommit The offsets to commit
 	 */
 	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
index 6bdfb48..6eef174 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 
-

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
index eff8264..5e3c42c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka09AvroTableSource}.
+ */
 public class Kafka09AvroTableSourceTest extends KafkaTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 6e13db2..f55c264 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -39,10 +39,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -63,7 +61,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;
@@ -91,10 +88,10 @@ public class Kafka09FetcherTest {
 
 		// ----- the mock consumer with blocking poll calls ----
 		final MultiShotLatch blockerLatch = new MultiShotLatch();
-		
+
 		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
 		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			
+
 			@Override
 			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
 				sync.trigger();
@@ -157,7 +154,7 @@ public class Kafka09FetcherTest {
 		sync.await();
 
 		// ----- trigger the offset commit -----
-		
+
 		final AtomicReference<Throwable> commitError = new AtomicReference<>();
 		final Thread committer = new Thread("committer runner") {
 			@Override
@@ -192,11 +189,11 @@ public class Kafka09FetcherTest {
 
 	@Test
 	public void ensureOffsetsGetCommitted() throws Exception {
-		
+
 		// test data
 		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
 		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-		
+
 		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
 		testCommitData1.put(testPartition1, 11L);
 		testCommitData1.put(testPartition2, 18L);
@@ -207,7 +204,6 @@ public class Kafka09FetcherTest {
 
 		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
 
-
 		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
 
 		final MultiShotLatch blockerLatch = new MultiShotLatch();
@@ -234,7 +230,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public Void answer(InvocationOnMock invocation) {
 				@SuppressWarnings("unchecked")
-				Map<TopicPartition, OffsetAndMetadata> offsets = 
+				Map<TopicPartition, OffsetAndMetadata> offsets =
 						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
 
 				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
@@ -242,7 +238,7 @@ public class Kafka09FetcherTest {
 				commitStore.add(offsets);
 				callback.onComplete(offsets, null);
 
-				return null; 
+				return null;
 			}
 		}).when(mockConsumer).commitAsync(
 				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
@@ -322,7 +318,7 @@ public class Kafka09FetcherTest {
 				assertEquals(27L, entry.getValue().offset());
 			}
 		}
-		
+
 		// ----- test done, wait till the fetcher is done for a clean shutdown -----
 		fetcher.cancel();
 		fetcherRunner.join();
@@ -387,7 +383,6 @@ public class Kafka09FetcherTest {
 				0L,
 				false);
 
-
 		// ----- run the fetcher -----
 
 		final AtomicReference<Throwable> error = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index ca9965c..de4d010 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -19,6 +19,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
+/**
+ * IT cases for Kafka 0.9 .
+ */
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
 	// ------------------------------------------------------------------------
@@ -35,7 +38,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {
 		runKeyValueTest();
@@ -58,7 +60,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runFailOnDeployTest();
 	}
 
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 3afb5e4..c8fb4cd 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -15,15 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
+/**
+ * Tests for the {@link Kafka09JsonTableSink}.
+ */
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index 35cd9ce..ec70386 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
 
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka09JsonTableSource}.
+ */
 public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index ae4f5b2..fe8a1a5 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import org.junit.Test;
 
+/**
+ * IT cases for the {@link FlinkKafkaProducer09}.
+ */
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index 16a13c0..d41cd91 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -18,16 +18,15 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.test.util.SecureTestEnvironment;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
+/**
+ * Kafka Secure Connection (kerberos) IT test case.
  */
 public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 
@@ -51,7 +50,6 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 		SecureTestEnvironment.cleanup();
 	}
 
-
 	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
 	//The timeout for the test case is 2 times timeout of ZK connection
 	@Test(timeout = 600000)

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index e9a4947..6b6c43f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -31,13 +31,10 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -53,17 +50,20 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
+/**
+ * Tests for the {@link KafkaProducer}.
+ */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(FlinkKafkaProducerBase.class)
 public class KafkaProducerTest extends TestLogger {
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testPropagateExceptions() {
 		try {
 			// mock kafka producer
 			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-			
+
 			// partition setup
 			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
 				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
@@ -79,13 +79,13 @@ public class KafkaProducerTest extends TestLogger {
 						return null;
 					}
 				});
-			
+
 			// make sure the FlinkKafkaProducer instantiates our mock producer
 			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-			
+
 			// (1) producer that propagates errors
 			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>) null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>) null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 84fdbf8..fc38e24 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -15,9 +15,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.common.KafkaException;
@@ -29,21 +37,12 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
 
 import java.io.File;
 import java.net.BindException;
@@ -54,12 +53,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import scala.collection.Seq;
+
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * An implementation of the KafkaServerProvider for Kafka 0.9
+ * An implementation of the KafkaServerProvider for Kafka 0.9 .
  */
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
@@ -166,7 +167,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
 
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if(secureMode) {
+		if (secureMode) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
 			numKafkaServers = 1;
 			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
@@ -205,7 +206,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				SocketServer socketServer = brokers.get(i).socketServer();
-				if(secureMode) {
+				if (secureMode) {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
 				} else {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -298,7 +299,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
 		do {
 			try {
-				if(secureMode) {
+				if (secureMode) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = Integer.parseInt(zkTimeout) / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -317,7 +318,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			// create a new ZK utils connection
 			ZkUtils checkZKConn = getZkUtils();
-			if(AdminUtils.topicExists(checkZKConn, topic)) {
+			if (AdminUtils.topicExists(checkZKConn, topic)) {
 				LOG.info("topic {} has been created successfully", topic);
 				checkZKConn.close();
 				return;
@@ -347,7 +348,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
 	 */
 	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
 		Properties kafkaProperties = new Properties();
@@ -363,7 +364,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if(additionalServerProperties != null) {
+		if (additionalServerProperties != null) {
 			kafkaProperties.putAll(additionalServerProperties);
 		}
 
@@ -374,7 +375,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if(secureMode) {
+			if (secureMode) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
@@ -405,7 +406,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if(secureMode) {
+		if (secureMode) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");
@@ -413,7 +414,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			//add special timeout for Travis
 			prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
 			prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
-			prop.setProperty("metadata.fetch.timeout.ms","120000");
+			prop.setProperty("metadata.fetch.timeout.ms", "120000");
 		}
 		return prop;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
index e95b51b..5bd4aff 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. 
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread.
  */
 public class HandoverTest {
 
@@ -219,7 +219,7 @@ public class HandoverTest {
 
 		// empty the handover
 		assertNotNull(handover.pollNext());
-		
+
 		// producing into an empty handover should work
 		try {
 			handover.produce(createTestRecords());
@@ -292,7 +292,7 @@ public class HandoverTest {
 
 	// ------------------------------------------------------------------------
 
-	private static abstract class CheckedThread extends Thread {
+	private abstract static class CheckedThread extends Thread {
 
 		private volatile Throwable error;
 
@@ -317,7 +317,7 @@ public class HandoverTest {
 
 		public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
 			final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
-			
+
 			while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
 				Thread.sleep(1);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index 4ac1773..bc93a2d 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -29,4 +29,4 @@ log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
 
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file
+log4j.logger.org.apache.directory=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index 2cc94b0..fc0045e 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -106,7 +106,7 @@ under the License.
 		</dependency>
 
 		<!-- test dependencies -->
-		
+
 		<!-- force using the latest zkclient -->
 		<dependency>
 			<groupId>com.101tec</groupId>
@@ -187,7 +187,6 @@ under the License.
 			</dependency>
 		</dependencies>
 	</dependencyManagement>
-	
 
 	<build>
 		<plugins>
@@ -233,5 +232,5 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
-	
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 87bedce..18748d0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -46,6 +45,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
+import org.apache.commons.collections.map.LinkedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,13 +61,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class of all Flink Kafka Consumer data sources.
  * This implements the common behavior across all Kafka versions.
- * 
+ *
  * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
  * {@link AbstractFetcher}.
- * 
+ *
  * @param <T> The type of records produced by this data source
  */
-public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
 		CheckpointListener,
 		ResultTypeQueryable<T>,
 		CheckpointedFunction,
@@ -75,11 +76,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private static final long serialVersionUID = -6272159445203409112L;
 
 	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
-	
-	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+
+	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
 	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
 
-	/** Boolean configuration key to disable metrics tracking **/
+	/** Boolean configuration key to disable metrics tracking. **/
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	// ------------------------------------------------------------------------
@@ -87,20 +88,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 
 	private final List<String> topics;
-	
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
+
+	/** The schema to convert between Kafka's byte messages, and Flink's objects. */
 	protected final KeyedDeserializationSchema<T> deserializer;
 
-	/** The set of topic partitions that the source will read, with their initial offsets to start reading from */
+	/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
 	private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
-	
+
 	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
 	 * to exploit per-partition timestamp characteristics.
 	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
 	private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
-	
+
 	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
-	 * to exploit per-partition timestamp characteristics. 
+	 * to exploit per-partition timestamp characteristics.
 	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
 	private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
 
@@ -119,26 +120,26 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	private OffsetCommitMode offsetCommitMode;
 
-	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
+	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
 	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
 
-	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
 	protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
 
 	// ------------------------------------------------------------------------
-	//  runtime state (used individually by each parallel subtask) 
+	//  runtime state (used individually by each parallel subtask)
 	// ------------------------------------------------------------------------
-	
-	/** Data for pending but uncommitted offsets */
+
+	/** Data for pending but uncommitted offsets. */
 	private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
 
-	/** The fetcher implements the connections to the Kafka brokers */
+	/** The fetcher implements the connections to the Kafka brokers. */
 	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
-	
-	/** The offsets to restore to, if the consumer restores state from a checkpoint */
+
+	/** The offsets to restore to, if the consumer restores state from a checkpoint. */
 	private transient volatile HashMap<KafkaTopicPartition, Long> restoredState;
-	
-	/** Flag indicating whether the consumer is still running **/
+
+	/** Flag indicating whether the consumer is still running. **/
 	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
@@ -158,30 +159,30 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 	//  Configuration
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
 	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
 	 * in the same way as in the Flink runtime, when streams are merged.
-	 * 
+	 *
 	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
 	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
 	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
 	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
 	 * parallel source subtask reads more that one partition.
-	 * 
+	 *
 	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
 	 * partition, allows users to let them exploit the per-partition characteristics.
-	 * 
+	 *
 	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
 	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
-	 * 
+	 *
 	 * @param assigner The timestamp assigner / watermark generator to use.
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
 		checkNotNull(assigner);
-		
+
 		if (this.periodicWatermarkAssigner != null) {
 			throw new IllegalStateException("A periodic watermark emitter has already been set.");
 		}
@@ -216,7 +217,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
 		checkNotNull(assigner);
-		
+
 		if (this.punctuatedWatermarkAssigner != null) {
 			throw new IllegalStateException("A punctuated watermark emitter has already been set.");
 		}
@@ -232,7 +233,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/**
 	 * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
 	 *
-	 * This setting will only have effect if checkpointing is enabled for the job.
+	 * <p>This setting will only have effect if checkpointing is enabled for the job.
 	 * If checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
 	 * property settings will be
 	 *
@@ -247,7 +248,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * Specifies the consumer to start reading from the earliest offset for all partitions.
 	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 	 *
-	 * This method does not effect where partitions are read from when the consumer is restored
+	 * <p>This method does not effect where partitions are read from when the consumer is restored
 	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
 	 * savepoint, only the offsets in the restored state will be used.
 	 *
@@ -263,7 +264,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * Specifies the consumer to start reading from the latest offset for all partitions.
 	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 	 *
-	 * This method does not effect where partitions are read from when the consumer is restored
+	 * <p>This method does not effect where partitions are read from when the consumer is restored
 	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
 	 * savepoint, only the offsets in the restored state will be used.
 	 *
@@ -281,7 +282,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
 	 * set in the configuration properties will be used for the partition.
 	 *
-	 * This method does not effect where partitions are read from when the consumer is restored
+	 * <p>This method does not effect where partitions are read from when the consumer is restored
 	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
 	 * savepoint, only the offsets in the restored state will be used.
 	 *
@@ -298,16 +299,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * The specified offset should be the offset of the next record that will be read from partitions.
 	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 	 *
-	 * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
+	 * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
 	 * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
 	 * map of offsets, the consumer will fallback to the default group offset behaviour (see
 	 * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
 	 *
-	 * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
+	 * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
 	 * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
 	 * configuration properties will be used for the partition
 	 *
-	 * This method does not effect where partitions are read from when the consumer is restored
+	 * <p>This method does not effect where partitions are read from when the consumer is restored
 	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
 	 * savepoint, only the offsets in the restored state will be used.
 	 *
@@ -444,7 +445,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			if (!running) {
 				return;
 			}
-			
+
 			// (3) run the fetcher' main work method
 			fetcher.runFetchLoop();
 		}
@@ -476,7 +477,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	public void cancel() {
 		// set ourselves as not running
 		running = false;
-		
+
 		// abort the fetcher, if there is one
 		if (kafkaFetcher != null) {
 			kafkaFetcher.cancel();
@@ -494,7 +495,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			super.close();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Checkpoint and restore
 	// ------------------------------------------------------------------------
@@ -635,19 +636,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 	//  Kafka Consumer specific methods
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
 	 * data, and emits it into the data streams.
-	 * 
+	 *
 	 * @param sourceContext The source context to emit data to.
 	 * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets.
 	 * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
 	 * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
 	 * @param runtimeContext The task's runtime context.
-	 * 
+	 *
 	 * @return The instantiated fetcher
-	 * 
+	 *
 	 * @throws Exception The method should forward exceptions
 	 */
 	protected abstract AbstractFetcher<T, ?> createFetcher(
@@ -661,11 +662,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
 
 	protected abstract boolean getIsAutoCommitEnabled();
-	
+
 	// ------------------------------------------------------------------------
-	//  ResultTypeQueryable methods 
+	//  ResultTypeQueryable methods
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public TypeInformation<T> getProducedType() {
 		return deserializer.getProducedType();
@@ -726,7 +727,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	/**
 	 * Logs the partition information in INFO level.
-	 * 
+	 *
 	 * @param logger The logger to log to.
 	 * @param partitionInfos List of subscribed partitions
 	 */
@@ -743,11 +744,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		}
 		StringBuilder sb = new StringBuilder(
 				"Consumer is going to read the following topics (with number of partitions): ");
-		
+
 		for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
 			sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
 		}
-		
+
 		logger.info(sb.toString());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 46d7d47..76a2f84 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,14 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -41,6 +33,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegat
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -53,13 +46,20 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
+import static java.util.Objects.requireNonNull;
 
 /**
  * Flink Sink to produce data into a Kafka topic.
  *
- * Please note that this producer provides at-least-once reliability guarantees when
+ * <p>Please note that this producer provides at-least-once reliability guarantees when
  * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
  * Otherwise, the producer doesn't provide any reliability guarantees.
  *
@@ -98,7 +98,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
 
 	/**
-	 * Partitions of each topic
+	 * Partitions of each topic.
 	 */
 	protected final Map<String, int[]> topicPartitionsMap;
 
@@ -114,16 +114,16 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 	// -------------------------------- Runtime fields ------------------------------------------
 
-	/** KafkaProducer instance */
+	/** KafkaProducer instance. */
 	protected transient KafkaProducer<byte[], byte[]> producer;
 
-	/** The callback than handles error propagation or logging callbacks */
+	/** The callback than handles error propagation or logging callbacks. */
 	protected transient Callback callback;
 
-	/** Errors encountered in the async producer are stored here */
+	/** Errors encountered in the async producer are stored here. */
 	protected transient volatile Exception asyncException;
 
-	/** Lock for accessing the pending records */
+	/** Lock for accessing the pending records. */
 	protected final SerializableObject pendingRecordsLock = new SerializableObject();
 
 	/** Number of unacknowledged records. */
@@ -196,9 +196,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	}
 
 	/**
-	 * Used for testing only
+	 * Used for testing only.
 	 */
-	protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+	@VisibleForTesting
+	protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
 		return new KafkaProducer<>(props);
 	}
 
@@ -213,8 +214,8 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 		RuntimeContext ctx = getRuntimeContext();
 
-		if(null != flinkKafkaPartitioner) {
-			if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
+		if (null != flinkKafkaPartitioner) {
+			if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
 				((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
 						getPartitionsByTopic(this.defaultTopicId, this.producer));
 			}
@@ -239,7 +240,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			}
 		}
 
-		if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
+		if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
 			LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
 			flushOnCheckpoint = false;
 		}
@@ -287,7 +288,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 
 		int[] partitions = this.topicPartitionsMap.get(targetTopic);
-		if(null == partitions) {
+		if (null == partitions) {
 			partitions = getPartitionsByTopic(targetTopic, producer);
 			this.topicPartitionsMap.put(targetTopic, partitions);
 		}
@@ -310,7 +311,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		producer.send(record, callback);
 	}
 
-
 	@Override
 	public void close() throws Exception {
 		if (producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index b88fb83..5c9a629 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.List;
-import java.util.Properties;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.AvroTypeInfo;
@@ -35,6 +28,15 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+
+import java.util.List;
+import java.util.Properties;
+
 /**
  * A version-agnostic Kafka Avro {@link StreamTableSource}.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 41bb329..51fd952 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -25,12 +26,12 @@ import org.apache.flink.types.Row;
 import java.util.Properties;
 
 /**
- * Base class for {@link KafkaTableSink} that serializes data in JSON format
+ * Base class for {@link KafkaTableSink} that serializes data in JSON format.
  */
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
-	
+
 	/**
-	 * Creates KafkaJsonTableSink
+	 * Creates KafkaJsonTableSink.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 460f948..1c8e0a0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -19,12 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
-import org.apache.flink.types.Row;
 
 /**
  * A version-agnostic Kafka JSON {@link StreamTableSource}.

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 1c38816..a94936c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Properties;
@@ -44,8 +45,8 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	protected TypeInformation[] fieldTypes;
 
 	/**
-	 * Creates KafkaTableSink
-	 * 
+	 * Creates KafkaTableSink.
+	 *
 	 * @param topic                 Kafka topic to write to.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param partitioner           Partitioner to select Kafka partition for each item

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 029aa45..8969f90 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -27,6 +26,8 @@ import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Properties;
+
 /**
  * A version-agnostic Kafka {@link StreamTableSource}.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
index 8bb75b4..0642e7e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.config;
  * The offset commit mode represents the behaviour of how offsets are externally committed
  * back to Kafka brokers / Zookeeper.
  *
- * The exact value of this is determined at runtime in the consumer subtasks.
+ * <p>The exact value of this is determined at runtime in the consumer subtasks.
  */
 public enum OffsetCommitMode {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 8fc2fe0..81c4138 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka.config;
 
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -23,13 +24,13 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
  */
 public enum StartupMode {
 
-	/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */
+	/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
 	GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
 
-	/** Start from the earliest offset possible */
+	/** Start from the earliest offset possible. */
 	EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
-	/** Start from the latest offset */
+	/** Start from the latest offset. */
 	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
 
 	/**
@@ -39,7 +40,7 @@ public enum StartupMode {
 	 */
 	SPECIFIC_OFFSETS(Long.MIN_VALUE);
 
-	/** The sentinel offset value corresponding to this startup mode */
+	/** The sentinel offset value corresponding to this startup mode. */
 	private long stateSentinel;
 
 	StartupMode(long stateSentinel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 0b311a9..cfd7c3b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -38,43 +38,43 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class for all fetchers, which implement the connections to Kafka brokers and
  * pull records from Kafka partitions.
- * 
+ *
  * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
- * as well as around the optional timestamp assignment and watermark generation. 
- * 
+ * as well as around the optional timestamp assignment and watermark generation.
+ *
  * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
  *            the Flink data streams.
  * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
  */
 public abstract class AbstractFetcher<T, KPH> {
-	
+
 	protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
 	protected static final int PERIODIC_WATERMARKS = 1;
 	protected static final int PUNCTUATED_WATERMARKS = 2;
-	
+
 	// ------------------------------------------------------------------------
-	
-	/** The source context to emit records and watermarks to */
+
+	/** The source context to emit records and watermarks to. */
 	protected final SourceContext<T> sourceContext;
 
 	/** The lock that guarantees that record emission and state updates are atomic,
-	 * from the view of taking a checkpoint */
+	 * from the view of taking a checkpoint. */
 	protected final Object checkpointLock;
 
-	/** All partitions (and their state) that this fetcher is subscribed to */
+	/** All partitions (and their state) that this fetcher is subscribed to. */
 	private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
 
-	/** The mode describing whether the fetcher also generates timestamps and watermarks */
+	/** The mode describing whether the fetcher also generates timestamps and watermarks. */
 	protected final int timestampWatermarkMode;
 
-	/** Flag whether to register metrics for the fetcher */
+	/** Flag whether to register metrics for the fetcher. */
 	protected final boolean useMetrics;
 
-	/** Only relevant for punctuated watermarks: The current cross partition watermark */
+	/** Only relevant for punctuated watermarks: The current cross partition watermark. */
 	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
 	// ------------------------------------------------------------------------
-	
+
 	protected AbstractFetcher(
 			SourceContext<T> sourceContext,
 			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
@@ -83,14 +83,13 @@ public abstract class AbstractFetcher<T, KPH> {
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
-			boolean useMetrics) throws Exception
-	{
+			boolean useMetrics) throws Exception {
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
 		this.useMetrics = useMetrics;
-		
+
 		// figure out what we watermark mode we will be using
-		
+
 		if (watermarksPeriodic == null) {
 			if (watermarksPunctuated == null) {
 				// simple case, no watermarks involved
@@ -106,7 +105,7 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 		}
 
-		// create our partition state according to the timestamp/watermark mode 
+		// create our partition state according to the timestamp/watermark mode
 		this.subscribedPartitionStates = initializeSubscribedPartitionStates(
 				assignedPartitionsWithInitialOffsets,
 				timestampWatermarkMode,
@@ -119,13 +118,13 @@ public abstract class AbstractFetcher<T, KPH> {
 				throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
 			}
 		}
-		
+
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
+			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
 					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
-			
-			PeriodicWatermarkEmitter periodicEmitter = 
+
+			PeriodicWatermarkEmitter periodicEmitter =
 					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
 			periodicEmitter.start();
 		}
@@ -149,17 +148,17 @@ public abstract class AbstractFetcher<T, KPH> {
 	// ------------------------------------------------------------------------
 
 	public abstract void runFetchLoop() throws Exception;
-	
+
 	public abstract void cancel();
 
 	// ------------------------------------------------------------------------
 	//  Kafka version specifics
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates the Kafka version specific representation of the given
 	 * topic partition.
-	 * 
+	 *
 	 * @param partition The Flink representation of the Kafka topic partition.
 	 * @return The specific Kafka representation of the Kafka topic partition.
 	 */
@@ -170,7 +169,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * older Kafka versions). This method is only ever called when the offset commit mode of
 	 * the consumer is {@link OffsetCommitMode#ON_CHECKPOINTS}.
 	 *
-	 * The given offsets are the internal checkpointed offsets, representing
+	 * <p>The given offsets are the internal checkpointed offsets, representing
 	 * the last processed record of each partition. Version-specific implementations of this method
 	 * need to hold the contract that the given offsets must be incremented by 1 before
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
@@ -179,16 +178,16 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * @throws Exception This method forwards exceptions.
 	 */
 	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
-	
+
 	// ------------------------------------------------------------------------
 	//  snapshot and restore the state
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Takes a snapshot of the partition offsets.
-	 * 
+	 *
 	 * <p>Important: This method mus be called under the checkpoint lock.
-	 * 
+	 *
 	 * @return A map from partition to current offset.
 	 */
 	public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
@@ -208,10 +207,10 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/**
 	 * Emits a record without attaching an existing timestamp to it.
-	 * 
+	 *
 	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
 	 * That makes the fast path efficient, the extended paths are called as separate methods.
-	 * 
+	 *
 	 * @param record The record to emit
 	 * @param partitionState The state of the Kafka partition from which the record was fetched
 	 * @param offset The offset of the record
@@ -282,8 +281,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * also a periodic watermark generator.
 	 */
 	protected void emitRecordWithTimestampAndPeriodicWatermark(
-			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
-	{
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
 		@SuppressWarnings("unchecked")
 		final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
 				(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
@@ -298,7 +296,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		}
 
 		// emit the record with timestamp, using the usual checkpoint lock to guarantee
-		// atomicity of record emission and offset state update 
+		// atomicity of record emission and offset state update
 		synchronized (checkpointLock) {
 			sourceContext.collectWithTimestamp(record, timestamp);
 			partitionState.setOffset(offset);
@@ -310,8 +308,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * also a punctuated watermark generator.
 	 */
 	protected void emitRecordWithTimestampAndPunctuatedWatermark(
-			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
-	{
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
 		@SuppressWarnings("unchecked")
 		final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
 				(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
@@ -322,7 +319,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
 
 		// emit the record with timestamp, using the usual checkpoint lock to guarantee
-		// atomicity of record emission and offset state update 
+		// atomicity of record emission and offset state update
 		synchronized (checkpointLock) {
 			sourceContext.collectWithTimestamp(record, timestamp);
 			partitionState.setOffset(offset);
@@ -346,7 +343,7 @@ public abstract class AbstractFetcher<T, KPH> {
 				@SuppressWarnings("unchecked")
 				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
 						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-				
+
 				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
 			}
 
@@ -375,11 +372,9 @@ public abstract class AbstractFetcher<T, KPH> {
 			int timestampWatermarkMode,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			ClassLoader userCodeClassLoader)
-		throws IOException, ClassNotFoundException
-	{
+			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
 		switch (timestampWatermarkMode) {
-			
+
 			case NO_TIMESTAMPS_WATERMARKS: {
 				@SuppressWarnings("unchecked")
 				KafkaTopicPartitionState<KPH>[] partitions =
@@ -410,7 +405,7 @@ public abstract class AbstractFetcher<T, KPH> {
 
 					AssignerWithPeriodicWatermarks<T> assignerInstance =
 							watermarksPeriodic.deserializeValue(userCodeClassLoader);
-					
+
 					partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
 							partition.getKey(), kafkaHandle, assignerInstance);
 					partitions[pos].setOffset(partition.getValue());
@@ -452,7 +447,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	// ------------------------- Metrics ----------------------------------
 
 	/**
-	 * Add current and committed offsets to metric group
+	 * Add current and committed offsets to metric group.
 	 *
 	 * @param metricGroup The metric group to use
 	 */
@@ -467,7 +462,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	}
 
 	/**
-	 * Gauge types
+	 * Gauge types.
 	 */
 	private enum OffsetGaugeType {
 		CURRENT_OFFSET,
@@ -500,7 +495,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		}
 	}
  	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * The periodic watermark emitter. In its given interval, it checks all partitions for
 	 * the current event time watermark, and possibly emits the next watermark.
@@ -508,23 +503,22 @@ public abstract class AbstractFetcher<T, KPH> {
 	private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
 
 		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
-		
+
 		private final SourceContext<?> emitter;
-		
+
 		private final ProcessingTimeService timerService;
 
 		private final long interval;
-		
+
 		private long lastWatermarkTimestamp;
-		
+
 		//-------------------------------------------------
 
 		PeriodicWatermarkEmitter(
 				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
 				SourceContext<?> emitter,
 				ProcessingTimeService timerService,
-				long autoWatermarkInterval)
-		{
+				long autoWatermarkInterval) {
 			this.allPartitions = checkNotNull(allPartitions);
 			this.emitter = checkNotNull(emitter);
 			this.timerService = checkNotNull(timerService);
@@ -533,17 +527,17 @@ public abstract class AbstractFetcher<T, KPH> {
 		}
 
 		//-------------------------------------------------
-		
+
 		public void start() {
 			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}
-		
+
 		@Override
 		public void onProcessingTime(long timestamp) throws Exception {
 
 			long minAcrossAll = Long.MAX_VALUE;
 			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
-				
+
 				// we access the current watermark for the periodic assigners under the state
 				// lock, to prevent concurrent modification to any internal variables
 				final long curr;
@@ -551,16 +545,16 @@ public abstract class AbstractFetcher<T, KPH> {
 				synchronized (state) {
 					curr = state.getCurrentWatermarkTimestamp();
 				}
-				
+
 				minAcrossAll = Math.min(minAcrossAll, curr);
 			}
-			
+
 			// emit next watermark, if there is one
 			if (minAcrossAll > lastWatermarkTimestamp) {
 				lastWatermarkTimestamp = minAcrossAll;
 				emitter.emitWatermark(new Watermark(minAcrossAll));
 			}
-			
+
 			// schedule the next watermark
 			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}


Mime
View raw message