flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [02/37] flink git commit: [FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer
Date Thu, 06 Apr 2017 07:28:15 GMT
[FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer

This closes #3458.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19322401
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19322401
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19322401

Branch: refs/heads/table-retraction
Commit: 1932240179189a88273f52d3d93f277e7bf604de
Parents: a119a30
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu Mar 2 18:56:39 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri Mar 31 12:33:48 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 36 +++++----
 .../kinesis/internals/KinesisDataFetcher.java   | 77 ++++----------------
 2 files changed, 34 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index ef1afca..1fcc529 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/).
 
 To use the connector, add the following Maven dependency to your project:
@@ -53,14 +56,14 @@ mvn clean install -Pinclude-kinesis -DskipTests
 The streaming connectors are not part of the binary distribution. See how to link with them
for cluster
 execution [here]({{site.baseurl}}/dev/linking.html).
 
-### Using the Amazon Kinesis Streams Service
+## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read
/ write to the Kinesis streams.
 
-### Kinesis Consumer
+## Kinesis Consumer
 
 The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes
to multiple AWS Kinesis
-streams within the same AWS service region, and can handle resharding of streams. Each subtask
of the consumer is
+streams within the same AWS service region, and can transparently handle resharding of streams
while the job is running. Each subtask of the consumer is
 responsible for fetching data records from multiple Kinesis shards. The number of shards
fetched by each subtask will
 change as shards are closed and created by Kinesis.
 
@@ -107,13 +110,16 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis
stream from
 
 Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
 
-**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and
restarting jobs) if there are idle consumer
-subtasks, which occur when the total number of shards is lower than the configured consumer
parallelism. The job must be
-configured to enable checkpointing, so that the new shards due to resharding can be correctly
picked up and consumed by the
-Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved
in future versions.
-Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.
+Note that the configured parallelism of the Flink Kinesis Consumer source
+can be completely independent of the total number of shards in the Kinesis streams.
+When the number of shards is larger than the parallelism of the consumer,
+then each consumer subtask can subscribe to multiple shards; otherwise
+if the number of shards is smaller than the parallelism of the consumer,
+then some consumer subtasks will simply be idle and wait until it gets assigned
+new shards (i.e., when the streams are resharded to increase the
+number of shards for higher provisioned Kinesis service throughput).
 
-#### Configuring Starting Position
+### Configuring Starting Position
 
 The Flink Kinesis Consumer currently provides the following options to configure where to
start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
to
 one of the following values in the provided configuration properties (the naming of the options
identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
@@ -127,7 +133,7 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME
     If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default
pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
     (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user
or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
 
-#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
+### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
 With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from
shards in Kinesis streams and
 periodically checkpoint each shard's progress. In case of a job failure, Flink will restore
the streaming program to the
@@ -157,7 +163,7 @@ Also note that Flink can only restart the topology if enough processing
slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must still be enough
slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
-#### Event Time for Consumed Records
+### Event Time for Consumed Records
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -201,7 +207,7 @@ kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
 </div>
 </div>
 
-#### Threading Model
+### Threading Model
 
 The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
 
@@ -214,7 +220,7 @@ For data consumption, a single thread will be created to consume each
discovered
 shard it is responsible of consuming is closed as a result of stream resharding. In other
words, there will always be
 one thread per open shard.
 
-#### Internally Used Kinesis APIs
+### Internally Used Kinesis APIs
 
 The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally
to call Kinesis APIs
 for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
@@ -248,7 +254,7 @@ adjusts the maximum number of records each consuming thread tries to fetch
from
 the latter modifies the sleep interval between each fetch (there will be no sleep by default).
The retry behaviour of the
 consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
 
-### Kinesis Producer
+## Kinesis Producer
 
 The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream.
Note that the producer is not participating in
 Flink's checkpointing and doesn't provide exactly-once processing guarantees.
@@ -305,7 +311,7 @@ Otherwise, the returned stream name is used.
 Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
 
 
-### Using Non-AWS Kinesis Endpoints for Testing
+## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS
Kinesis endpoint such as
 [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing
functional testing of a Flink

http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index a06fdca..46847b3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
@@ -325,35 +324,10 @@ public class KinesisDataFetcher<T> {
 				ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
 
-		// FLINK-4341:
-		// For downstream operators that work on time (ex. window operators), we are required to
emit a max value watermark
-		// for subtasks that won't continue to have shards to read from unless resharding happens
in the future, otherwise
-		// the downstream watermarks would not advance, leading to unbounded accumulating state.
-		//
-		// The side-effect of this limitation is that on resharding, we must fail hard if the newly
discovered shard
-		// is to be subscribed by a subtask that has previously emitted a max value watermark,
otherwise the watermarks
-		// will be messed up.
-		//
-		// There are 2 cases were we need to either emit a max value watermark, or deliberately
fail hard:
-		//  (a) if this subtask has no more shards to read from unless resharding happens in the
future, we emit a max
-		//      value watermark. This case is encountered when 1) all previously read shards by
this subtask were closed
-		//      due to resharding, 2) when this subtask was initially only subscribed to closed
shards while the consumer
-		//      was told to start from TRIM_HORIZON, or 3) there was initially no shards for this
subtask to read on startup.
-		//  (b) this subtask has discovered new shards to read from due to a reshard; if this subtask
has already emitted
-		//      a max value watermark, we must deliberately fail hard to avoid messing up the watermarks.
The new shards
-		//      will be subscribed by this subtask after restore as initial shards on startup.
-		//
-		// TODO: This is a temporary workaround until a min-watermark information service is available
in the JobManager
-		// Please see FLINK-4341 for more detail
-
-		boolean emittedMaxValueWatermark = false;
-
 		if (this.numberOfActiveShards.get() == 0) {
-			// FLINK-4341 workaround case (a) - please see the above for details on this case
-			LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark
...",
+			LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily
idle ...",
 				indexOfThisConsumerSubtask);
-			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-			emittedMaxValueWatermark = true;
+			sourceContext.markAsTemporarilyIdle();
 		}
 
 		while (running) {
@@ -363,41 +337,6 @@ public class KinesisDataFetcher<T> {
 			}
 			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
 
-			// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards
--
-			// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(),
there exists
-			// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
-			// may not correctly reflect the discover result in the below case determination. This
may lead to incorrect
-			// case determination on the current discovery attempt, but can still be correctly handled
on future attempts.
-			//
-			// Although this can be resolved by wrapping the current shard discovery attempt with
the below
-			// case determination within a synchronized block on the checkpoint lock for atomicity,
there will be
-			// considerable throughput performance regression as shard discovery is a remote call
to AWS. Therefore,
-			// since the case determination is a temporary workaround for FLINK-4341, the race condition
is tolerable as
-			// we can still eventually handle max value watermark emitting / deliberately failing
on successive
-			// discovery attempts.
-
-			if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() ==
0 && !emittedMaxValueWatermark) {
-				// FLINK-4341 workaround case (a) - please see the above for details on this case
-				LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
-					indexOfThisConsumerSubtask);
-				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-				emittedMaxValueWatermark = true;
-			} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark)
{
-				// FLINK-4341 workaround case (b) - please see the above for details on this case
-				//
-				// Note that in the case where on resharding this subtask ceased to read all of it's
previous shards
-				// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
-				// will be false; this allows the fetcher to continue reading the new shards without
failing on such cases.
-				// However, due to the race condition mentioned above, we might still fall into case
(a) first, and
-				// then (b) on the next discovery attempt. Although the failure is ideally unnecessary,
max value
-				// watermark emitting still remains to be correct.
-
-				LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to
avoid messing" +
-						" up watermarks; the new shards will be subscribed by this subtask after restore ...",
-					indexOfThisConsumerSubtask, newShardsDueToResharding.size());
-				throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
-			}
-
 			for (KinesisStreamShard shard : newShardsDueToResharding) {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
@@ -605,9 +544,19 @@ public class KinesisDataFetcher<T> {
 			// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer
thread,
 			// we've finished reading the shard and should determine it to be non-active
 			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
{
-				this.numberOfActiveShards.decrementAndGet();
 				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
 					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+
+				// check if we need to mark the source as idle;
+				// note that on resharding, if registerNewSubscribedShardState was invoked for newly
discovered shards
+				// AFTER the old shards had reached the end, the subtask's status will be automatically
toggled back to
+				// be active immediately afterwards as soon as we collect records from the new shards
+				if (this.numberOfActiveShards.decrementAndGet() == 0) {
+					LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking
the subtask as temporarily idle ...",
+						indexOfThisConsumerSubtask);
+
+					sourceContext.markAsTemporarilyIdle();
+				}
 			}
 		}
 	}


Mime
View raw message