Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4A2E2200B40 for ; Fri, 1 Jul 2016 13:24:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 48C3A160A61; Fri, 1 Jul 2016 11:24:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 18A4B160A5D for ; Fri, 1 Jul 2016 13:24:11 +0200 (CEST) Received: (qmail 24171 invoked by uid 500); 1 Jul 2016 11:24:11 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 24160 invoked by uid 99); 1 Jul 2016 11:24:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 11:24:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 1D4D22C027F for ; Fri, 1 Jul 2016 11:24:11 +0000 (UTC) Date: Fri, 1 Jul 2016 11:24:11 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 01 Jul 2016 11:24:13 -0000 [ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358818#comment-15358818 ] ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69285572 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + *
    + *
  • 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring)
  • + *
  • 2. decide where in each discovered shard should the fetcher start subscribing to
  • + *
  • 3. subscribe to shards by creating a single thread for each shard
  • + *
+ * + *

The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // ------------------------------------------------------------------------ + // Consumer-wide settings + // ------------------------------------------------------------------------ + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** + * The deserialization schema we will be using to convert Kinesis records to Flink objects. + * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must + * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. + */ + private final KinesisDeserializationSchema deserializationSchema; + + // ------------------------------------------------------------------------ + // Subtask-specific settings + // ------------------------------------------------------------------------ + + /** Runtime context of the subtask that this fetcher was created in */ + private final RuntimeContext runtimeContext; + + private final int totalNumberOfConsumerSubtasks; + + private final int indexOfThisConsumerSubtask; + + /** + * This flag should be set by {@link FlinkKinesisConsumer} using + * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)} + */ + private boolean isRestoredFromFailure; + + // ------------------------------------------------------------------------ + // Executor services to run created threads + // ------------------------------------------------------------------------ - /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ - private HashMap assignedShardsWithStartingSequenceNum; + /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */ + private final ExecutorService shardConsumersExecutor; - /** Reference to the thread that executed run() */ - private volatile Thread mainThread; + // ------------------------------------------------------------------------ + // Managed state, accessed and updated across multiple threads + // ------------------------------------------------------------------------ - /** Reference to the first error thrown by any of the spawned shard connection threads */ + /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in. + * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called. + */ + private final Map subscribedStreamsToLastDiscoveredShardIds; + + /** + * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher + * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update + * the last processed sequence number of subscribed shards as they fetch and process records. + * + *

Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations + * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose, + * all threads must use the following thread-safe methods this class provides to operate on this list: + *

    + *
  • {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}
  • + *
  • {@link KinesisDataFetcher#updateState(int, SequenceNumber)}
  • + *
  • {@link KinesisDataFetcher#emitRecordAndUpdateState(T, int, SequenceNumber)}
  • + *
+ */ + private final List subscribedShardsState; + + private final SourceFunction.SourceContext sourceContext; + + /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */ + private final Object checkpointLock; + + /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */ private final AtomicReference error; + /** The Kinesis proxy that the fetcher will be using to discover new shards */ + private final KinesisProxyInterface kinesis; + + /** Thread that executed runFetcher() */ + private Thread mainThread; + private volatile boolean running = true; /** - * Creates a new Kinesis Data Fetcher for the specified set of shards + * Creates a Kinesis Data Fetcher. * - * @param assignedShards the shards that this fetcher will pull data from - * @param configProps the configuration properties of this Flink Kinesis Consumer - * @param taskName the task name of this consumer task + * @param streams the streams to subscribe to + * @param sourceContext context of the source function + * @param runtimeContext this subtask's runtime context + * @param configProps the consumer configuration properties + * @param deserializationSchema deserialization schema */ - public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + public KinesisDataFetcher(List streams, + SourceFunction.SourceContext sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema deserializationSchema) { + this(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + new AtomicReference(), + new LinkedList(), + createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), + KinesisProxy.create(configProps)); + } + + /** This constructor is exposed for testing purposes */ + protected KinesisDataFetcher(List streams, + SourceFunction.SourceContext sourceContext, + Object checkpointLock, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema deserializationSchema, + AtomicReference error, + LinkedList subscribedShardsState, + HashMap subscribedStreamsToLastDiscoveredShardIds, + KinesisProxyInterface kinesis) { + this.streams = checkNotNull(streams); this.configProps = checkNotNull(configProps); - this.assignedShardsWithStartingSequenceNum = new HashMap<>(); - for (KinesisStreamShard shard : assignedShards) { - assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.get()); - } - this.taskName = taskName; - this.error = new AtomicReference<>(); + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = checkNotNull(checkpointLock); + this.runtimeContext = checkNotNull(runtimeContext); + this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); + this.deserializationSchema = checkNotNull(deserializationSchema); + this.kinesis = checkNotNull(kinesis); + + this.error = checkNotNull(error); + this.subscribedShardsState = checkNotNull(subscribedShardsState); + this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds); + + this.shardConsumersExecutor = + createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); } /** - * Advance a shard's starting sequence number to a specified value + * Starts the fetcher. After starting the fetcher, it can only + * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. * - * @param streamShard the shard to perform the advance on - * @param sequenceNum the sequence number to advance to + * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher. */ - public void advanceSequenceNumberTo(KinesisStreamShard streamShard, SequenceNumber sequenceNum) { - if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) { - throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read."); - } - assignedShardsWithStartingSequenceNum.put(streamShard, sequenceNum); - } - - public void run(SourceFunction.SourceContext sourceContext, - KinesisDeserializationSchema deserializationSchema, - HashMap lastSequenceNums) throws Exception { + public void runFetcher() throws Exception { - if (assignedShardsWithStartingSequenceNum == null || assignedShardsWithStartingSequenceNum.size() == 0) { - throw new IllegalArgumentException("No shards set to read for this fetcher"); + // check that we are running before proceeding + if (!running) { + return; } this.mainThread = Thread.currentThread(); - LOG.info("Reading from shards " + assignedShardsWithStartingSequenceNum); + // ------------------------------------------------------------------------ + // Procedures before starting the infinite while loop: + // ------------------------------------------------------------------------ + + // 1. query for any new shards that may have been created while the Kinesis consumer was not running, + // and register them to the subscribedShardState list. + List newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { + // the starting state for new shards created while the consumer wasn't running depends on whether or not + // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means + // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint, + // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards. + SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure) + ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM + : KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(configProps); - // create a thread for each individual shard - ArrayList> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size()); - for (Map.Entry assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) { - ShardConsumerThread thread = new ShardConsumerThread<>(this, configProps, assignedShard.getKey(), - assignedShard.getValue(), sourceContext, InstantiationUtil.clone(deserializationSchema), lastSequenceNums); - thread.setName(String.format("ShardConsumer - %s - %s/%s", - taskName, assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId())); - thread.setDaemon(true); - consumerThreads.add(thread); + if (LOG.isInfoEnabled()) { + String logFormat = (isRestoredFromFailure) + ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" + : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't" + + "running due to failure, starting state set as sequence number {}"; + + LOG.info(logFormat, runtimeContext.getIndexOfThisSubtask(), shard.toString(), startingStateForNewShard.get()); + } + registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get())); } - // check that we are viable for running for the last time before starting threads - if (!running) { - return; + // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by + // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null) + boolean hasShards = false; + StringBuilder streamsWithNoShardsFound = new StringBuilder(); + for (Map.Entry streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) { + if (streamToLastDiscoveredShardEntry.getValue() != null) { + hasShards = true; + } else { + streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", "); + } } - for (ShardConsumerThread shardConsumer : consumerThreads) { - LOG.info("Starting thread {}", shardConsumer.getName()); - shardConsumer.start(); + if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) { + LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", + indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString()); } - // wait until all consumer threads are done, or until the fetcher is aborted, or until - // an error occurred in one of the consumer threads - try { - boolean consumersStillRunning = true; - while (running && error.get() == null && consumersStillRunning) { - try { - // wait for the consumer threads. if an error occurs, we are interrupted - for (ShardConsumerThread consumerThread : consumerThreads) { - consumerThread.join(); - } + if (!hasShards) { + throw new RuntimeException("No shards can be found for all subscribed streams: " + streams); + } - // check if there are consumer threads still running - consumersStillRunning = false; - for (ShardConsumerThread consumerThread : consumerThreads) { - consumersStillRunning = consumersStillRunning | consumerThread.isAlive(); - } - } catch (InterruptedException e) { - // ignore - } + // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the + // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the + // consumer using a restored state checkpoint + for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { + KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}" + --- End diff -- I think the + at the end of the string is wrong. > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis users can choose to "merge" and "split" shards at any time for adjustable stream throughput capacity. This article explains this quite clearly: https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic version of the Kinesis consumer (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task mapping is done in a simple round-robin-like distribution which can be locally determined at each Flink consumer task (Flink Kafka consumer does this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer tasks coordinate which shards they are currently handling, and allow the tasks to ask the coordinator for a shards reassignment when the task finds out it has found a closed shard at runtime (shards will be closed by Kinesis when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink consumer tasks. Tasks can use this state store to locally determine what shards it can be reassigned. Amazon KCL uses a DynamoDB table for the coordination, but as described in https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use KCL for the implementation of the consumer if we want to leverage Flink's checkpointing mechanics. For our own implementation, Zookeeper can be used for this state store, but that means it would require the user to set up ZK to work. > Since this feature introduces extensive work, it is opened as a separate sub-task from the basic implementation https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)