Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB667200B39 for ; Fri, 24 Jun 2016 15:43:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EA2FA160A58; Fri, 24 Jun 2016 13:43:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44326160A2E for ; Fri, 24 Jun 2016 15:43:17 +0200 (CEST) Received: (qmail 24108 invoked by uid 500); 24 Jun 2016 13:43:16 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 24076 invoked by uid 99); 24 Jun 2016 13:43:16 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jun 2016 13:43:16 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 3EF812C033A for ; Fri, 24 Jun 2016 13:43:16 +0000 (UTC) Date: Fri, 24 Jun 2016 13:43:16 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 24 Jun 2016 13:43:18 -0000 [ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348280#comment-15348280 ] ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r68399454 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -122,29 +173,36 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { } /** - * Get the list of shards associated with multiple Kinesis streams + * Get the complete shard list of multiple Kinesis streams. * - * @param streamNames the list of Kinesis streams - * @return a list of {@link KinesisStreamShard}s + * @param streamNames Kinesis streams to retrieve the shard list for + * @return shard list result */ - public List getShardList(List streamNames) { - List shardList = new ArrayList<>(); + public GetShardListResult getShardList(List streamNames) throws InterruptedException { --- End diff -- Missing `@Override` annotation > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis users can choose to "merge" and "split" shards at any time for adjustable stream throughput capacity. This article explains this quite clearly: https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic version of the Kinesis consumer (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task mapping is done in a simple round-robin-like distribution which can be locally determined at each Flink consumer task (Flink Kafka consumer does this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer tasks coordinate which shards they are currently handling, and allow the tasks to ask the coordinator for a shards reassignment when the task finds out it has found a closed shard at runtime (shards will be closed by Kinesis when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink consumer tasks. Tasks can use this state store to locally determine what shards it can be reassigned. Amazon KCL uses a DynamoDB table for the coordination, but as described in https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use KCL for the implementation of the consumer if we want to leverage Flink's checkpointing mechanics. For our own implementation, Zookeeper can be used for this state store, but that means it would require the user to set up ZK to work. > Since this feature introduces extensive work, it is opened as a separate sub-task from the basic implementation https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)