Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1EE70200B4C for ; Fri, 8 Jul 2016 03:51:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1D766160A72; Fri, 8 Jul 2016 01:51:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6F564160A75 for ; Fri, 8 Jul 2016 03:51:12 +0200 (CEST) Received: (qmail 83566 invoked by uid 500); 8 Jul 2016 01:51:11 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 83425 invoked by uid 99); 8 Jul 2016 01:51:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 01:51:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 2F8892C02AF for ; Fri, 8 Jul 2016 01:51:11 +0000 (UTC) Date: Fri, 8 Jul 2016 01:51:11 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 08 Jul 2016 01:51:13 -0000 [ https://issues.apache.org/jira/browse/FLINK-4022?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1536= 7086#comment-15367086 ]=20 Tzu-Li (Gordon) Tai commented on FLINK-4022: -------------------------------------------- [~rmetzger] I can start working on this now (but I don't think I'll be able= to finish it before RCs for 1.1). Before I get onto it, I'll like to colle= ct anything I should be aware of for the implementation. I think the way to= go will mainly be quite the same as the shard discovery in the Kinesis con= nector. Is there any other Kafka API you know of that we may use for this? = Or any implications with constantly fetching partition info from Kafka, lik= e what the Kinesis connector does? > Partition discovery / regex topic subscription for the Kafka consumer > --------------------------------------------------------------------- > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer aut= omatically reads from "topic-n1", "topic-n2", ... and so on as they are add= ed to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job submis= sion, the main big change required for this feature will be dynamic partiti= on assignment to subtasks while the Kafka consumer is running. This will ma= inly be accomplished using Kafka 0.9.x API `KafkaConsumer#subscribe(java.ut= il.regex.Pattern, ConsumerRebalanceListener)`. Each KafkaConsumers in each = subtask will be added to the same consumer group when instantiated, and rel= y on Kafka to dynamically reassign partitions to them whenever a rebalance = happens. The registered `ConsumerRebalanceListener` is a callback that is c= alled right before and after rebalancing happens. We'll use this callback t= o let each subtask commit its last offsets of partitions its currently resp= onsible of to an external store (or Kafka) before a rebalance; after rebala= nce and the substasks gets the new partitions it'll be reading from, they'l= l read from the external store to get the last offsets for their new partit= ions (partitions which don't have offset entries in the store are new parti= tions causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition as= signment is dynamic. Snapshotting will remain the same - subtasks snapshot = the offsets of partitions they are currently holding. Restoring will be a = bit different in that subtasks might not be assigned matching partitions to= the snapshot the subtask is restored with (since we're letting Kafka dynam= ically assign partitions). There will need to be a coordination process whe= re, if a restore state exists, all subtasks first commit the offsets they r= eceive (as a result of the restore state) to the external store, and then a= ll subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by [~Step= hanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is available, = then the restore will be simple again, as each subtask has full access to p= revious global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long= run for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, Deseria= lizationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign = static partitions, use subscribe() registered with the callback implementat= ion explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hol= d before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics= ), the KafkaConsumer#subscribe() has a corresponding overload method for fi= xed list of topics. We can simply decide which subscribe() overload to use = depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't= emit MAX_VALUE watermark, since it may hold partitions after a rebalance. = Instead, un-assigned subtasks should be running a fetcher instance too and = take part as a process pool for the consumer group of the subscribed topics= . -- This message was sent by Atlassian JIRA (v6.3.4#6332)