Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 85969200BBC for ; Sun, 13 Nov 2016 22:54:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8395D160AF8; Sun, 13 Nov 2016 21:54:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CD6A9160AE4 for ; Sun, 13 Nov 2016 22:54:00 +0100 (CET) Received: (qmail 65257 invoked by uid 500); 13 Nov 2016 21:53:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 65246 invoked by uid 99); 13 Nov 2016 21:53:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Nov 2016 21:53:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 790532C4C70 for ; Sun, 13 Nov 2016 21:53:59 +0000 (UTC) Date: Sun, 13 Nov 2016 21:53:59 +0000 (UTC) From: "Jun Yao (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashion" MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 13 Nov 2016 21:54:01 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662168#comment-15662168 ] Jun Yao commented on KAFKA-4402: -------------------------------- Hi, Ewen, I updated the description, I am looking at the same producer code as you are. The issue is more that the counter is not per topic. I added a unit test in my pr to validate this, without the fix the result will not be balanced. https://github.com/apache/kafka/pull/2128/files#diff-f30df3b3b79e9be0de6c94dcce90a56e meanwhile, I also run a local test from producer side to validate this: bin/kafka-topics.sh --create --topic mtest0 --zookeeper localhost:2181 --partitions 3 --replication-factor 1 bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1 public class KafkaProducerPartitionTest { private Producer producer; public static void main(String[] args) { KafkaProducerPartitionTest kafkaProducerPartitionTest = new KafkaProducerPartitionTest(); try { kafkaProducerPartitionTest.run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws InterruptedException { initProducer(); Map partitionCount = new HashMap<>(); String loopTopic = "mtest0"; CountDownLatch latch = new CountDownLatch(360); Callback c = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(loopTopic.equalsIgnoreCase(metadata.topic())){ partitionCount.put(metadata.partition(), partitionCount.getOrDefault(metadata.partition(), 0) + 1); } latch.countDown(); } }; for(int i = 0; i < 300; ++i){ producer.send(new ProducerRecord(loopTopic, "" + i), c); if(i%5 == 0 ){ producer.send(new ProducerRecord("test", "a"), c); } } latch.await(); System.out.println("partitionCount=" + partitionCount); } public void initProducer() { try { Properties props = new Properties(); props.load(getClass().getClassLoader().getResourceAsStream("kafka-config.properties")); producer = new KafkaProducer<>(props); } catch (IOException e) { e.printStackTrace(); } } Without the fix, it will print partitionCount={0=60, 1=120, 2=120} after the fix, it will print partitionCount={0=100, 1=100, 2=100} > Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashion" > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-4402 > URL: https://issues.apache.org/jira/browse/KAFKA-4402 > Project: Kafka > Issue Type: Improvement > Reporter: Jun Yao > Priority: Minor > > From this code comments, it is said that Kafka client Producer's DefaultPartitioner will do round robin if "no partition or key is present", > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34 > from the code it looks trying to do round robin as well, as it maintained a counter and try to increase it every time and then will decide which partition to go to; > However the issue here is the counter is a global counter that is shared by all the topics, so it is actually not round robin per topic and sometimes caused unbalanced routing among different partitions. > Although we can pass a custom implementation of interface "org.apache.kafka.clients.producer.Partitioner", it might be still good to make the default implementation true round robin as comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)