kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Yao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4402) Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments "If no partition or key is present choose a partition in a round-robin fashion"
Date Sun, 13 Nov 2016 21:53:59 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662168#comment-15662168
] 

Jun Yao commented on KAFKA-4402:
--------------------------------

Hi, Ewen, 
I updated the description, I am looking at the same producer code as you are. 
The issue is more that the counter is not per topic. 

I added a unit test in my pr to validate this,  without the fix the result will not be balanced.

https://github.com/apache/kafka/pull/2128/files#diff-f30df3b3b79e9be0de6c94dcce90a56e

meanwhile, I also run a local test from producer side to validate this: 
bin/kafka-topics.sh --create --topic mtest0 --zookeeper localhost:2181 --partitions 3 --replication-factor
1
bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor
1


public class KafkaProducerPartitionTest {
    private Producer<String, String> producer;

    public static void main(String[] args) {
        KafkaProducerPartitionTest kafkaProducerPartitionTest = new KafkaProducerPartitionTest();
        try {
            kafkaProducerPartitionTest.run();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run() throws InterruptedException {
        initProducer();

        Map<Integer, Integer> partitionCount = new HashMap<>();

        String loopTopic = "mtest0";



        CountDownLatch latch = new CountDownLatch(360);
        Callback c = new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(loopTopic.equalsIgnoreCase(metadata.topic())){
                    partitionCount.put(metadata.partition(), partitionCount.getOrDefault(metadata.partition(),
0) + 1);
                }

                latch.countDown();
            }
        };

        for(int i = 0; i < 300; ++i){
            producer.send(new ProducerRecord<String, String>(loopTopic, "" + i), c);
            if(i%5 == 0 ){
                producer.send(new ProducerRecord<String, String>("test", "a"), c);
            }
        }

        latch.await();

        System.out.println("partitionCount=" + partitionCount);
    }

    public void initProducer() {
        try {
            Properties props = new Properties();
            props.load(getClass().getClassLoader().getResourceAsStream("kafka-config.properties"));
            producer = new KafkaProducer<>(props);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Without the fix, it will print 
partitionCount={0=60, 1=120, 2=120}


after the fix, it will print 
partitionCount={0=100, 1=100, 2=100}




> Kafka Producer's DefaultPartitioner is actually not round robin as said in the code comments
"If no partition or key is present choose a partition in a round-robin fashion"
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4402
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4402
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jun Yao
>            Priority: Minor
>
> From this code comments, it is said that Kafka client  Producer's DefaultPartitioner
will do round robin if "no partition or key is present", 
> https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L34
> from the code it looks trying to do round robin as well, as it maintained a counter and
try to increase it every time and then will decide which partition to go to; 
> However the issue here is the counter is a global counter that is shared by all the topics,
so  it is actually not round robin per topic and sometimes caused unbalanced routing among
different partitions. 
> Although we can pass a custom implementation of interface "org.apache.kafka.clients.producer.Partitioner",
it might be still good to make the default implementation true round robin as comment. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message