flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [07/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-elasticsearch*
Date Sun, 28 May 2017 06:17:37 GMT
[FLINK-6711] Activate strict checkstyle for flink-elasticsearch*


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c20b396f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c20b396f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c20b396f

Branch: refs/heads/master
Commit: c20b396f5ea09a54f52e980c70e30888a7a2859c
Parents: 88189f2
Author: zentol <chesnay@apache.org>
Authored: Wed May 24 23:20:29 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Sun May 28 00:11:12 2017 +0200

----------------------------------------------------------------------
 .../ActionRequestFailureHandler.java            |  9 +++----
 .../ElasticsearchApiCallBridge.java             |  3 ++-
 .../elasticsearch/ElasticsearchSinkBase.java    | 28 +++++++++++++-------
 .../ElasticsearchSinkFunction.java              |  7 +++--
 .../elasticsearch/util/NoOpFailureHandler.java  |  1 +
 .../RetryRejectedExecutionFailureHandler.java   |  1 +
 .../ElasticsearchSinkBaseTest.java              | 23 ++++++++--------
 .../ElasticsearchSinkTestBase.java              | 13 ++++-----
 .../EmbeddedElasticsearchNodeEnvironment.java   |  2 +-
 .../testutils/SourceSinkDataTestKit.java        |  1 +
 .../flink-connector-elasticsearch/pom.xml       |  2 +-
 .../Elasticsearch1ApiCallBridge.java            |  2 ++
 .../elasticsearch/ElasticsearchSink.java        | 16 +++++------
 .../elasticsearch/IndexRequestBuilder.java      |  8 +++---
 .../elasticsearch/ElasticsearchSinkITCase.java  |  7 ++++-
 .../examples/ElasticsearchSinkExample.java      |  4 +--
 .../flink-connector-elasticsearch2/pom.xml      |  2 +-
 .../Elasticsearch2ApiCallBridge.java            |  4 ++-
 .../elasticsearch2/ElasticsearchSink.java       | 14 +++++-----
 .../ElasticsearchSinkFunction.java              |  7 +++--
 .../elasticsearch2/RequestIndexer.java          |  1 +
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  1 +
 .../elasticsearch2/ElasticsearchSinkITCase.java |  5 ++++
 .../examples/ElasticsearchSinkExample.java      |  2 ++
 .../Elasticsearch5ApiCallBridge.java            |  4 ++-
 .../elasticsearch5/ElasticsearchSink.java       | 14 +++++-----
 ...mbeddedElasticsearchNodeEnvironmentImpl.java |  1 +
 .../elasticsearch5/ElasticsearchSinkITCase.java |  5 ++++
 .../examples/ElasticsearchSinkExample.java      |  2 ++
 29 files changed, 110 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index abbdd72..3ca1417 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -26,8 +26,7 @@ import java.io.Serializable;
  * {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or
  * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
  *
- * <p>
- * Example:
+ * <p>Example:
  *
  * <pre>{@code
  *
@@ -50,12 +49,10 @@ import java.io.Serializable;
  *
  * }</pre>
  *
- * <p>
- * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
+ * <p>The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
  * with malformed documents, without failing the sink. For all other failures, the sink will fail.
  *
- * <p>
- * Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
+ * <p>Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
  * could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
  * and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index b482432..ce98dfb 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Map;
 
@@ -31,7 +32,7 @@ import java.util.Map;
  * This includes calls to create Elasticsearch clients, handle failed item responses, etc. Any incompatible Elasticsearch
  * Java APIs should be bridged using this interface.
  *
- * Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
+ * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
  * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
  * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index f6944b3..2ab5a90 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -49,14 +50,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class for all Flink Elasticsearch Sinks.
  *
- * <p>
- * This class implements the common behaviour across Elasticsearch versions, such as
+ * <p>This class implements the common behaviour across Elasticsearch versions, such as
  * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before
  * sending the requests to the cluster, as well as passing input records to the user provided
  * {@link ElasticsearchSinkFunction} for processing.
  *
- * <p>
- * The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
+ * <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
  * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
  * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
  *
@@ -80,11 +79,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
 	public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
 
+	/**
+	 * Used to control whether the retry delay should increase exponentially or remain constant.
+	 */
 	public enum FlushBackoffType {
 		CONSTANT,
 		EXPONENTIAL
 	}
 
+	/**
+	 * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to resource constraints
+	 * (i.e. the client's internal thread pool is full), the backoff policy decides how long the bulk processor will
+	 * wait before the operation is retried internally.
+	 *
+	 * <p>This is a proxy for version specific backoff policies.
+	 */
 	public class BulkFlushBackoffPolicy implements Serializable {
 
 		private static final long serialVersionUID = -6022851996101826049L;
@@ -149,14 +158,14 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	//  Internals for the Flink Elasticsearch Sink
 	// ------------------------------------------------------------------------
 
-	/** Call bridge for different version-specfic */
+	/** Call bridge for different version-specific. */
 	private final ElasticsearchApiCallBridge callBridge;
 
 	/**
 	 * Number of pending action requests not yet acknowledged by Elasticsearch.
 	 * This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
 	 *
-	 * This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
+	 * <p>This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
 	 * to the {@link RequestIndexer}. It is decremented for each completed request of a bulk request, in
 	 * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and
 	 * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}.
@@ -174,7 +183,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	 * the user considered it should fail the sink via the
 	 * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
 	 *
-	 * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
+	 * <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
 	 */
 	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
 
@@ -260,7 +269,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	 * Disable flushing on checkpoint. When disabled, the sink will not wait for all
 	 * pending action requests to be acknowledged by Elasticsearch on checkpoints.
 	 *
-	 * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
+	 * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
 	 * provide any strong guarantees for at-least-once delivery of action requests.
 	 */
 	public void disableFlushOnCheckpoint() {
@@ -320,8 +329,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	/**
 	 * Build the {@link BulkProcessor}.
 	 *
-	 * Note: this is exposed for testing purposes.
+	 * <p>Note: this is exposed for testing purposes.
 	 */
+	@VisibleForTesting
 	protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 		checkNotNull(listener);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
index 1e20a0a..8248204 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+
 import org.elasticsearch.action.ActionRequest;
 
 import java.io.Serializable;
@@ -27,11 +28,9 @@ import java.io.Serializable;
 /**
  * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
  *
- * <p>
- * This is used by sinks to prepare elements for sending them to Elasticsearch.
+ * <p>This is used by sinks to prepare elements for sending them to Elasticsearch.
  *
- * <p>
- * Example:
+ * <p>Example:
  *
  * <pre>{@code
  *					private static class TestElasticSearchSinkFunction implements

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index b19ea08..dffee20 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch.util;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
 import org.elasticsearch.action.ActionRequest;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index fabdcbc..9380959 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.elasticsearch.util;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index b9df5c6..5e59785 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -41,16 +42,16 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Suite of tests for {@link ElasticsearchSinkBase}.
@@ -123,9 +124,9 @@ public class ElasticsearchSinkBaseTest {
 
 	/**
 	 * Tests that any item failure in the listener callbacks due to flushing on an immediately following checkpoint
-	 * is rethrown; we set a timeout because the test will not finish if the logic is broken
+	 * is rethrown; we set a timeout because the test will not finish if the logic is broken.
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
 		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
 			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
@@ -250,7 +251,7 @@ public class ElasticsearchSinkBaseTest {
 	 * Tests that any bulk failure in the listener callbacks due to flushing on an immediately following checkpoint
 	 * is rethrown; we set a timeout because the test will not finish if the logic is broken.
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
 		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
 			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
@@ -307,9 +308,9 @@ public class ElasticsearchSinkBaseTest {
 
 	/**
 	 * Tests that the sink correctly waits for pending requests (including re-added requests) on checkpoints;
-	 * we set a timeout because the test will not finish if the logic is broken
+	 * we set a timeout because the test will not finish if the logic is broken.
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testAtLeastOnceSink() throws Throwable {
 		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
 				new HashMap<String, String>(),
@@ -365,9 +366,9 @@ public class ElasticsearchSinkBaseTest {
 	/**
 	 * This test is meant to assure that testAtLeastOnceSink is valid by testing that if flushing is disabled,
 	 * the snapshot method does indeed finishes without waiting for pending requests;
-	 * we set a timeout because the test will not finish if the logic is broken
+	 * we set a timeout because the test will not finish if the logic is broken.
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
 		final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
 			new HashMap<String, String>(), new SimpleSinkFunction<String>(), new DummyRetryFailureHandler());
@@ -409,7 +410,7 @@ public class ElasticsearchSinkBaseTest {
 
 		/**
 		 * This method is used to mimic a scheduled bulk request; we need to do this
-		 * manually because we are mocking the BulkProcessor
+		 * manually because we are mocking the BulkProcessor.
 		 */
 		public void manualBulkRequestWithAllPendingRequests() {
 			flushLatch.trigger(); // let the flush
@@ -429,7 +430,7 @@ public class ElasticsearchSinkBaseTest {
 		 * Set the list of mock failures to use for the next bulk of item responses. A {@code null}
 		 * means that the response is successful, failed otherwise.
 		 *
-		 * The list is used with corresponding order to the requests in the bulk, i.e. the first
+		 * <p>The list is used with corresponding order to the requests in the bulk, i.e. the first
 		 * request uses the response at index 0, the second requests uses the response at index 1, etc.
 		 */
 		public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 2f9e4c1..297bc5d 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.junit.AfterClass;
@@ -45,7 +46,7 @@ import static org.junit.Assert.fail;
  */
 public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
 
-	protected final static String CLUSTER_NAME = "test-cluster";
+	protected static final String CLUSTER_NAME = "test-cluster";
 
 	protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
 
@@ -116,7 +117,7 @@ public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgram
 
 		try {
 			createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch(IllegalArgumentException expectedException) {
+		} catch (IllegalArgumentException expectedException) {
 			// test passes
 			return;
 		}
@@ -137,7 +138,7 @@ public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgram
 				userConfig,
 				Collections.<InetSocketAddress>emptyList(),
 				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch(IllegalArgumentException expectedException) {
+		} catch (IllegalArgumentException expectedException) {
 			// test passes
 			return;
 		}
@@ -162,7 +163,7 @@ public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgram
 
 		try {
 			env.execute("Elasticsearch Transport Client Test");
-		} catch(JobExecutionException expectedException) {
+		} catch (JobExecutionException expectedException) {
 			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
 			return;
 		}
@@ -170,7 +171,7 @@ public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgram
 		fail();
 	}
 
-	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses */
+	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */
 	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
 																			List<InetSocketAddress> transportAddresses,
 																			ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
@@ -178,7 +179,7 @@ public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgram
 	/**
 	 * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
 	 *
-	 * This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+	 * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
 	 * because the Elasticsearch Java API to do so is incompatible across different versions.
 	 */
 	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index f59eb03..ea6e7a3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -24,7 +24,7 @@ import java.io.File;
 /**
  * The {@link EmbeddedElasticsearchNodeEnvironment} is used in integration tests to manage Elasticsearch embedded nodes.
  *
- * NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
+ * <p>NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
  *       for the tests, concrete implementations must be named {@code EmbeddedElasticsearchNodeEnvironmentImpl}. It must
  *       also be located under the same package. The intentional package-private accessibility of this interface
  *       enforces that.

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
index 55a48fa..4e3d3e2 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml b/flink-connectors/flink-connector-elasticsearch/pom.xml
index 93e4eb6..7785a77 100644
--- a/flink-connectors/flink-connector-elasticsearch/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch/pom.xml
@@ -65,7 +65,7 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 8a59da9..5659ee6 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.util.Preconditions;
+
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.Client;
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index 2298986..bc5ac84 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
@@ -32,24 +33,20 @@ import java.util.Map;
  * Elasticsearch 1.x sink that requests multiple {@link ActionRequest ActionRequests}
  * against a cluster for each incoming element.
  *
- * <p>
- * When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
+ * <p>When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
  * the sink will create a local {@link Node} for communicating with the Elasticsearch cluster. When using the second
  * constructor {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a
  * {@link TransportClient} will be used instead.
  *
- * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
+ * <p><b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
  * can be connected to. When using the local {@code Node} for communicating, the sink will block and wait for a cluster
  * to come online.
  *
- * <p>
- * The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
+ * <p>The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
  * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is
  * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to.
  *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -59,8 +56,7 @@ import java.util.Map;
  *   settings in milliseconds
  * </ul>
  *
- * <p>
- * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
  * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
  * {@link ElasticsearchSinkFunction} for an example.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
index 18aa11e..1a93fa3 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+
 import org.elasticsearch.action.index.IndexRequest;
 
 import java.io.Serializable;
@@ -26,14 +28,12 @@ import java.io.Serializable;
 /**
  * Function that creates an {@link IndexRequest} from an element in a Stream.
  *
- * <p>
- * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
+ * <p>This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
  * to prepare elements for sending them to Elasticsearch. See
  * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a>
  * for information about how to format data for adding it to an Elasticsearch index.
  *
- * <p>
- * Example:
+ * <p>Example:
  *
  * <pre>{@code
  *     private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 3a7b113..ecbebd7 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+
+import com.google.common.collect.Lists;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
@@ -36,6 +38,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * IT Cases for the {@link ElasticsearchSink}.
+ */
 public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
index d697c3c..f181032 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -42,7 +43,7 @@ import java.util.Map;
 public class ElasticsearchSinkExample {
 
 	public static void main(String[] args) throws Exception {
-		
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
@@ -67,7 +68,6 @@ public class ElasticsearchSinkExample {
 			}
 		}));
 
-
 		env.execute("Elasticsearch Sink Example");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml
index 7e21b8f..1f342bc 100644
--- a/flink-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -43,7 +43,7 @@ under the License.
 	<dependencies>
 
 		<!-- core dependencies -->
- 
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index e85daf5..66b676c 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallB
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +51,7 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 	/**
 	 * User-provided transport addresses.
 	 *
-	 * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x.
+	 * <p>We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x.
 	 */
 	private final List<InetSocketAddress> transportAddresses;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index 6d771d4..0c991a6 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -14,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -31,17 +33,14 @@ import java.util.Map;
  * Elasticsearch 2.x sink that requests multiple {@link ActionRequest ActionRequests}
  * against a cluster for each incoming element.
  *
- * <p>
- * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * <p>The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
- * <p>
- * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
  * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
  * which should be set to the name of the cluster that the sink should emit to.
  *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -51,8 +50,7 @@ import java.util.Map;
  *   settings in milliseconds
  * </ul>
  *
- * <p>
- * You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}.
+ * <p>You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}.
  * This is used to create multiple {@link ActionRequest ActionRequests} for each incoming element. See the class level
  * documentation of {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} for an example.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
index c474390..74a1446 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.api.common.functions.Function;
@@ -24,11 +25,9 @@ import java.io.Serializable;
 /**
  * Method that creates multiple {@link org.elasticsearch.action.ActionRequest}s from an element in a Stream.
  *
- * <p>
- * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
+ * <p>This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
  *
- * <p>
- * Example:
+ * <p>Example:
  *
  * <pre>{@code
  *					private static class TestElasticSearchSinkFunction implements

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
index b2b3de4..ecaf984 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.elasticsearch.action.ActionRequest;

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
index ddf3bd6..db4cd8c 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkITCase;
+
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.Node;

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 93ac6c8..7ded893 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -14,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -27,6 +29,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
 public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
index 8c50847..c963927 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch2.examples;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -21,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index c7d81f5..ffb572d 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallB
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -52,7 +54,7 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 	/**
 	 * User-provided transport addresses.
 	 *
-	 * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x.
+	 * <p>We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x.
 	 */
 	private final List<InetSocketAddress> transportAddresses;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 61023c2..0f1cc91 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -32,17 +34,14 @@ import java.util.Map;
  * Elasticsearch 5.x sink that requests multiple {@link ActionRequest ActionRequests}
  * against a cluster for each incoming element.
  *
- * <p>
- * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * <p>The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
- * <p>
- * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
  * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
  * which should be set to the name of the cluster that the sink should emit to.
  *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
@@ -52,8 +51,7 @@ import java.util.Map;
  *   settings in milliseconds
  * </ul>
  *
- * <p>
- * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
  * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
  * {@link ElasticsearchSinkFunction} for an example.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
index f3d8897..16e292d 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkITCase;
+
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index 3ebda52..ad7c664 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -28,6 +30,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
 public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c20b396f/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
index 4135283..22c1053 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.elasticsearch5.examples;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -23,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 


Mime
View raw message