flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
Date Mon, 27 Jun 2016 13:51:52 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351030#comment-15351030
] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68579220
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
    @@ -17,156 +17,489 @@
     
     package org.apache.flink.streaming.connectors.kinesis.internals;
     
    +import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
     import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
     import org.apache.flink.util.InstantiationUtil;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.Map;
    -import java.util.HashMap;
    +
    +import java.util.LinkedList;
     import java.util.List;
    -import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Map;
     import java.util.Properties;
    +
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards.
    - * The fetcher spawns a single thread for connection to each shard.
    + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards.
Each parallel subtask instantiates
    + * and runs a single fetcher throughout the subtask's lifetime. The fetcher runs several
threads to accomplish
    + * the following:
    + * <ul>
    + *     <li>1. continuously poll Kinesis to discover shards that the subtask should
subscribe to. The subscribed subset
    + *     		  of shards, including future new shards, is non-overlapping across subtasks
(no two subtasks will be
    + *     		  subscribed to the same shard) and determinate across subtask restores (the
subtask will always subscribe
    + *     		  to the same subset of shards even after restoring)</li>
    + *     <li>2. decide where in each discovered shard should the fetcher start subscribing
to</li>
    + *     <li>3. subscribe to shards by creating a single thread for each shard</li>
    + * </ul>
    + *
    + * <p>The fetcher manages two states: 1) pending shards for subscription, and 2)
last processed sequence numbers of
    + * each subscribed shard. All operations on the states in multiple threads should only
be done using the handler methods
    + * provided in this class.
      */
    -public class KinesisDataFetcher {
    +public class KinesisDataFetcher<T> {
     
     	private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
     
    -	/** Config properties for the Flink Kinesis Consumer */
    +	// ------------------------------------------------------------------------
    +	//  Consumer-wide settings
    +	// ------------------------------------------------------------------------
    +
    +	/** Configuration properties for the Flink Kinesis Consumer */
     	private final Properties configProps;
     
    -	/** The name of the consumer task that this fetcher was instantiated */
    -	private final String taskName;
    +	/** The list of Kinesis streams that the consumer is subscribing to */
    +	private final List<String> streams;
    +
    +	/**
    +	 * The deserialization schema we will be using to convert Kinesis records to Flink objects.
    +	 * Note that since this might not be thread-safe, multiple threads in the fetcher using
this must
    +	 * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}.
    +	 */
    +	private final KinesisDeserializationSchema<T> deserializationSchema;
    +
    +	// ------------------------------------------------------------------------
    +	//  Subtask-specific settings
    +	// ------------------------------------------------------------------------
    +
    +	/** Runtime context of the subtask that this fetcher was created in */
    +	private final RuntimeContext runtimeContext;
    +
    +	// ------------------------------------------------------------------------
    +	//  Executor services to run created threads
    +	// ------------------------------------------------------------------------
    +
    +	/** Executor service to run the {@link ShardDiscoverer} and {@link ShardSubscriber}
*/
    +	private final ExecutorService shardDiscovererAndSubscriberExecutor;
    +
    +	/** Executor service to run {@link ShardConsumer}s to consumer Kinesis shards */
    +	private final ExecutorService shardConsumersExecutor;
     
    -	/** Information of the shards that this fetcher handles, along with the sequence numbers
that they should start from */
    -	private HashMap<KinesisStreamShard, String> assignedShardsWithStartingSequenceNum;
    +	// ------------------------------------------------------------------------
    +	//  Managed state, accessed and updated across multiple threads
    +	// ------------------------------------------------------------------------
     
    -	/** Reference to the thread that executed run() */
    -	private volatile Thread mainThread;
    +	/**
    +	 * Blocking queue for newly discovered shards, with their states, that this fetcher
should consume.
    +	 * The {@link ShardDiscoverer} will add shards with initial position as state to this
queue as shards are discovered,
    +	 * while the {@link ShardSubscriber} polls this queue to start subscribing to the new
discovered shards.
    +	 */
    +	private final BlockingQueue<KinesisStreamShardState> pendingShards;
    +
    +	/**
    +	 * The shards, along with their last processed sequence numbers, that this fetcher is
subscribed to. The shard
    +	 * subscriber will add to this list as it polls pending shards. Shard consumer threads
update the last processed
    +	 * sequence number of subscribed shards as they fetch and process records.
    +	 *
    +	 * <p>Note that since multiple threads will be performing operations on this list,
all operations must be wrapped in
    +	 * synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this
purpose, all threads must use
    +	 * the following thread-safe methods this class provides to operate on this list:
    +	 * <ul>
    +	 *     <li>{@link KinesisDataFetcher#addAndStartConsumingNewSubscribedShard(KinesisStreamShardState)}</li>
    +	 *     <li>{@link KinesisDataFetcher#updateState(int, String)}</li>
    +	 *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(Object, int, String)}</li>
    +	 * </ul>
    +	 */
    +	private final List<KinesisStreamShardState> subscribedShardsState;
    +
    +	private final SourceFunction.SourceContext<T> sourceContext;
    +
    +	/** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
    +	private final Object checkpointLock;
     
    -	/** Reference to the first error thrown by any of the spawned shard connection threads
*/
    +	/** This flag is set to true if the fetcher is provided a non-null and non-empty restored
state */
    +	private final boolean isRestoredFromCheckpoint;
    +
    +	/** Reference to the first error thrown by any of the created threads */
     	private final AtomicReference<Throwable> error;
     
    +	/**
    +	 *  Lock used by atomic operations to startup / shutdown the fetcher, preventing indeterminate
behaviour of
    +	 *  creating and shutting down resources. Also, {@link Object#wait()} is called on this
lock after the startup
    +	 *  process completes in {@link KinesisDataFetcher#runFetcher()}. We wake from the wait
only when
    +	 *  {@link KinesisDataFetcher#shutdownFetcher()} has been called to execute the shutdown
process.
    +	 */
    +	private final Object fetcherShutdownLock = new Object();
    +
     	private volatile boolean running = true;
     
     	/**
    -	 * Creates a new Kinesis Data Fetcher for the specified set of shards
    +	 * Creates a Kinesis Data Fetcher.
     	 *
    -	 * @param assignedShards the shards that this fetcher will pull data from
    -	 * @param configProps the configuration properties of this Flink Kinesis Consumer
    -	 * @param taskName the task name of this consumer task
    +	 * @param streams the streams to subscribe to
    +	 * @param sourceContext context of the source function
    +	 * @param runtimeContext this subtask's runtime context
    +	 * @param configProps the consumer configuration properties
    +	 * @param restoredState state of subcribed shards that the fetcher should restore to
    +	 * @param deserializationSchema deserialization schema
     	 */
    -	public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, Properties
configProps, String taskName) {
    +	public KinesisDataFetcher(List<String> streams,
    +							SourceFunction.SourceContext<T> sourceContext,
    +							RuntimeContext runtimeContext,
    +							Properties configProps,
    +							Map<KinesisStreamShard, String> restoredState,
    +							KinesisDeserializationSchema<T> deserializationSchema) {
    +		this(streams,
    +			sourceContext,
    +			sourceContext.getCheckpointLock(),
    +			runtimeContext,configProps,
    +			restoredState,
    +			deserializationSchema,
    +			new AtomicReference<Throwable>(),
    +			new LinkedBlockingQueue<KinesisStreamShardState>(),
    +			new LinkedList<KinesisStreamShardState>());
    +	}
    +
    +	/** This constructor is exposed for testing purposes */
    +	protected KinesisDataFetcher(List<String> streams,
    +								SourceFunction.SourceContext<T> sourceContext,
    +								Object checkpointLock,
    +								RuntimeContext runtimeContext,
    +								Properties configProps,
    +								Map<KinesisStreamShard, String> restoredState,
    +								KinesisDeserializationSchema<T> deserializationSchema,
    +								AtomicReference<Throwable> error,
    +								LinkedBlockingQueue<KinesisStreamShardState> pendingShardsQueue,
    +								LinkedList<KinesisStreamShardState> subscribedShardsState) {
    +		this.streams = checkNotNull(streams);
     		this.configProps = checkNotNull(configProps);
    -		this.assignedShardsWithStartingSequenceNum = new HashMap<>();
    -		for (KinesisStreamShard shard : assignedShards) {
    -			assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
    +		this.sourceContext = checkNotNull(sourceContext);
    +		this.checkpointLock = checkNotNull(checkpointLock);
    +		this.runtimeContext = checkNotNull(runtimeContext);
    +		this.deserializationSchema = checkNotNull(deserializationSchema);
    +
    +		this.error = error;
    +		this.pendingShards = pendingShardsQueue;
    +		this.subscribedShardsState = subscribedShardsState;
    +
    +		this.shardDiscovererAndSubscriberExecutor =
    +			createShardDiscovererAndSubscriberThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +		this.shardConsumersExecutor =
    +			createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +
    +		this.isRestoredFromCheckpoint = (restoredState != null && restoredState.entrySet().size()
!= 0);
    +
    +		// if there is state to restore from last checkpoint, we seed them as initially discovered
shards
    +		if (isRestoredFromCheckpoint) {
    +			seedPendingShardsWithRestoredState(restoredState, this.pendingShards);
     		}
    -		this.taskName = taskName;
    -		this.error = new AtomicReference<>();
     	}
     
     	/**
    -	 * Advance a shard's starting sequence number to a specified value
    +	 * Starts the fetcher. After starting the fetcher, it can only
    +	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
     	 *
    -	 * @param streamShard the shard to perform the advance on
    -	 * @param sequenceNum the sequence number to advance to
    +	 * @throws Exception the first error or exception thrown by the fetcher or any of the
threads created by the fetcher.
     	 */
    -	public void advanceSequenceNumberTo(KinesisStreamShard streamShard, String sequenceNum)
{
    -		if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
    -			throw new IllegalArgumentException("Can't advance sequence number on a shard we are
not going to read.");
    +	public void runFetcher() throws Exception {
    +
    +		// atomic operation to startup the fetcher and create the shard discoverer and subscriber.
    +		synchronized (fetcherShutdownLock) {
    +
    +			// this flag will be false if the shutdown procedure was
    +			// executed first; if so, we return without doing anything.
    +			if (!running) {
    +				return;
    +			}
    +
    +			if (LOG.isInfoEnabled()) {
    +				LOG.info("Subtask {} is starting the shard discoverer ...", runtimeContext.getIndexOfThisSubtask());
    +			}
    +			shardDiscovererAndSubscriberExecutor.submit(new ShardDiscoverer<>(this));
    +
    +			if (LOG.isInfoEnabled()) {
    +				LOG.info("Subtask {} is starting the shard subscriber ...", runtimeContext.getIndexOfThisSubtask());
    +			}
    +			shardDiscovererAndSubscriberExecutor.submit(new ShardSubscriber<>(this));
    --- End diff --
    
    Perhaps we can simply integrate the functionality of this thread into `ShardDiscoverer`?
Once new shards we should be subscribing to is discovered, we open up a new `ShardConsumer`
for it. This we also won't need the `pendingShards` queue.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis users can
choose to "merge" and "split" shards at any time for adjustable stream throughput capacity.
This article explains this quite clearly: https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic version of
the Kinesis consumer (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task
mapping is done in a simple round-robin-like distribution which can be locally determined
at each Flink consumer task (Flink Kafka consumer does this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer tasks coordinate
which shards they are currently handling, and allow the tasks to ask the coordinator for a
shards reassignment when the task finds out it has found a closed shard at runtime (shards
will be closed by Kinesis when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink consumer
tasks. Tasks can use this state store to locally determine what shards it can be reassigned.
Amazon KCL uses a DynamoDB table for the coordination, but as described in https://issues.apache.org/jira/browse/FLINK-3211,
we unfortunately can't use KCL for the implementation of the consumer if we want to leverage
Flink's checkpointing mechanics. For our own implementation, Zookeeper can be used for this
state store, but that means it would require the user to set up ZK to work.
> Since this feature introduces extensive work, it is opened as a separate sub-task from
the basic implementation https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message