kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-1227) Code dump of new producer
Date Thu, 23 Jan 2014 22:51:37 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880474#comment-13880474

Jun Rao commented on KAFKA-1227:

I made a pass of the producer client code. The following are my comments.

1. Selector: It seems that the selector never closes an existing socket on its own (other
than when the selector itself is closed). For example, not existing sockets are closed after
metadata refresh. This has the implication that it may increase the # of socket connections
that a client has to maintain. For example, if every client uses all brokers as the metadata
broker list, it means that every client will maintain a socket connection to every broker,
which doesn't seem to be very scalable. Also, if a partition is moved to some new brokers,
the client will still be maintaining the socket connections to the old brokers. In 0.8, we
close all existing sockets everytime the metadata is refreshed.

2. Metadata: We need to think through the case when the clients use a VIP in the metadata
broker list. In this patch, it seems that we only use the VIP once and then switch to actual
broker list after first metadata update. This means that the producer can only issue metadata
requests to brokers to which replicas are assigned. In 0.8, we always fetch metadata requests
using the metadata broker list. Another thing that we do in 0.8 is to close the socket connection
after each metadata request. When using a VIP, an idle socket connection can be killed by
the load balancer. If the vip is not configured properly, it may take a long time (e.g., 8
minutes) to detect that the socket is already killed, which will slow down the fetching of

3. DefaultPartitioner:
3.1 This has the issue that every instance of producer always starts with partition 0, which
could create imbalanced load if multiple producers are created at the same time.
3.2 Also, a better default partitioner when no partition key is provided, is probably to select
a random "available" (i.e., leader node exists) partition, instead of just a random partition.

4.Partitioner.partition(): From cluster, we can get the partition list for a topic. Is the
passed in numPartitions redundant?

5. Sender:
5.1 run(): It seems that it's possible to have a produce request and metadata request to be
sent to the same node in one iteration. This will cause selector.poll() to fail since we can't
send more than 1 request to the same node per poll.
5.2 produceRequest(): topicDatas is weird since data is the plural form of datum.

6. TopicPartition: How do we prevent that the computed hash code is exactly 0?

7. BufferExhaustedException: It's probably useful to include the requested size in the exception.

8. RecordAccumulator:
8.1 Should we call free bufferPool?
8.2 ready(): Should a partition be also considered ready if it has only 1 ReocrdBatch whose
size is exactly of batchSize?

9. RecordBatch.done(): Should we unblock RecordSend after registered callbacks are called?

10. RecordSend: We should include at least the partition number and probably the topic itself.

11. Various mis-spellings:
11.1 ProducerRecord: chosing
11.2 KafkaProducer:
11.2.1 comments above send(): messaging waiting = > messages waiting
11.2.2 {@link kafka.clients.producer.RecordSend.await() await()}: 2 await()
11.3 RecordBatch: sufficent

12. Formatting: Should we use 4-space indentation vs 2-space? The latter is what we have been
using in scala.

The following can be added to the TODO list:

13. BufferPool: When we add jmx, it's probably using to have one on size of the waiter list,
and another on the available memory.

14. logging: It seems that there is no logging messages and we use e.printStackTrace() in
a few places. Should we use log4j?

15. Configs: It would be useful to log every overridden value and unused property name.

> Code dump of new producer
> -------------------------
>                 Key: KAFKA-1227
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1227
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-1227.patch
> The plan is to take a dump of the producer code "as is" and then do a series of post-commit
reviews to get it into shape. This bug tracks just the code dump.

This message was sent by Atlassian JIRA

View raw message