kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-1227) Code dump of new producer
Date Thu, 23 Jan 2014 23:48:43 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880519#comment-13880519
] 

Guozhang Wang commented on KAFKA-1227:
--------------------------------------

Some more comments:

--- General

1. How to we decide where to put Exception definitions? Currently we have an errors folder
in kafka.comm and some folders also have their only exceptions.

2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions
are highly dependent on the protocol class?

3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder,
for example, called kafka.comm.metadata?

4. Shall we put the Serializer classes into the protocol folder?

5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?


--- kafka.common.Cluster

1. Since the nextNode use global round robin, we need to make sure no more than one objects
access a single Cluster’s nextNode.

--- kafka.common.StringSerialization

1. Shall we put config names such as ENCODING_CONFIG all in a single file?

--- kafka.common.AbstractIterator

1. makeNext is not supposed to left in other states other than DONE and READY?

--- kafka.common.protocl.Schema

1. Will Field order difference make to different schemas?

--- kafka.common.protocl.ProtoUtil

1. parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo
object for parsing might be preferable. We can put these objects in the Protocol.java file
so any protocol change would only require one file edit.

--- kafka.common.record.LogEntry

1. Maybe we can rename to OffsetRecord?

--- kafka.common.record.Record

1. Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently
their values are the same and the way they are computed are also identical.

--- kafka.common.request.RequestHeader

1. Is it better to define "client_id" strings as static field in the Protocol.java?

2. Does REQUEST/RESPONSE_HEADER also need to be versioned?

--- kafka.client.common.NetworkReceive

1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)?

2. Why NetworkReceive not extending ByteBufferReceive?

--- kafka.client.common.Selector

1. “transmissions.send.remaining() <= 0”, under what condition can remaining() be <
0?

2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null?

--- kafka.client.producer.internals.BufferPool:

1. In the freeUp() function, should use this.free.pollLast().capacity() instead of limit()?

2. What is the rational of having just one poolable size?

--- kafka.clients.producer.internals.Metadata

1. After configs are added, we need to remove the hard-coded default values. So for all of
these places we could leave a TODO mark for now.

--- kafka.clients.producer.internals.ProduceRequestResult

1. Its member fields are dependent on Protocol.java, so once we change the protocol we would
probably also need to change this file.

--- kafka.clients.producer.internals.RecordAccumulator

1. Typo: “Get a list of topic-partitions which are ready to be send.”

--- kafka.clients.producer.internals.Sender

1. One corner case we may need to consider is the following: if a partition becomes not available,
and producer keep sending data to this partition, then later on this partition could exhaust
the memory, keeping other partitions to not able to take more messages but block waiting.

2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await()
is called this exception will be thrown and the callback not be executed. Since this exception
is already stored in the RecordSend I think a better way is not throw exception on await()
but let the callback function to handle it. That would make the application code more clean
since otherwise the application need so try-catch the await() call.

3. In closing the producer, there is another corner case that the io thread can keep trying
to send the rest of the data and failed. Probably we could add another option to drop whatever
is in the buffer and let the callback functions of the application to handle them.

> Code dump of new producer
> -------------------------
>
>                 Key: KAFKA-1227
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1227
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series of post-commit
reviews to get it into shape. This bug tracks just the code dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Mime
View raw message