Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1DFC7200C5B for ; Thu, 27 Apr 2017 09:16:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1C98F160B98; Thu, 27 Apr 2017 07:16:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 648BE160BA7 for ; Thu, 27 Apr 2017 09:16:07 +0200 (CEST) Received: (qmail 93238 invoked by uid 500); 27 Apr 2017 07:16:06 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 93221 invoked by uid 99); 27 Apr 2017 07:16:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 07:16:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E000DC074C for ; Thu, 27 Apr 2017 07:16:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.201 X-Spam-Level: X-Spam-Status: No, score=-99.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id iMBCArsF9gqH for ; Thu, 27 Apr 2017 07:16:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id CE1E25FCE1 for ; Thu, 27 Apr 2017 07:16:04 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 62509E0BD5 for ; Thu, 27 Apr 2017 07:16:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 15D3121DE0 for ; Thu, 27 Apr 2017 07:16:04 +0000 (UTC) Date: Thu, 27 Apr 2017 07:16:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 27 Apr 2017 07:16:08 -0000 [ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986135#comment-15986135 ] ASF GitHub Bot commented on FLINK-6288: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 One thing to be careful with, though: Since now we're querying Kafka for partition metadata within the `invoke` method, the query must be handled robustly and make sure it doesn't result in unexpectedly longer checkpoint times by blocking the whole stream at the Kafka sink. Most notably, we need to consider the corner case where Kafka isn't cooperating nicely: 1. how to handle arbitrary long response time for fetching the partition metadata? 2. how to handle the case where, due to some Kafka brokers temporary unavailable, the returned partitions is not complete? For 2., I can also forsee that we have a separate "partitions update thread" that refreshes the `Map` cache continuously at a fixed interval. This can also involve to a `FlinkKafkaPartitioner` that can provide dynamically changing `int[] partitions` when invoking the `partition` method. Perhaps we shouldn't include that with this PR, as its orthogonal to the API change. Just some food for though :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the default topic, but the custom {{Partitioner}} interface does not follow this semantic. > The partitioner is always invoked the {{partition}} method with the number of partitions in the default topic, and not the number of partitions of the current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)