kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Honghai Chen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
Date Tue, 04 Nov 2014 06:11:34 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195762#comment-14195762
] 

Honghai Chen commented on KAFKA-391:
------------------------------------

This situation happen under below scenario:
one broker is leader for several partitions, for example 3,   when send one messageset which
has message for all of the 3 partitions of this broker ,      the response.status.size is
3 and  the producerRequest.data.size is 1.    then it hit this exception.   Any idea for fix?
 Do we need compare response.status.size  with messagesPerTopic.Count instead of producerRequest.data.size
?


  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition,
ByteBufferMessageSet]) = {
    if(brokerId < 0) {
      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
      messagesPerTopic.keys.toSeq
    } else if(messagesPerTopic.size > 0) {
      val currentCorrelationId = correlationId.getAndIncrement
      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
        config.requestTimeoutMs, messagesPerTopic)
      var failedTopicPartitions = Seq.empty[TopicAndPartition]
      try {
        val syncProducer = producerPool.getProducer(brokerId)
        debug("Producer sending messages with correlation id %d for topics %s to broker %d
on %s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host,
syncProducer.config.port))
        val response = syncProducer.send(producerRequest)
        debug("Producer sent messages with correlation id %d for topics %s to broker %d on
%s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host,
syncProducer.config.port))
        if(response != null) {
          if (response.status.size != producerRequest.data.size)
            throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response,
producerRequest))



> Producer request and response classes should use maps
> -----------------------------------------------------
>
>                 Key: KAFKA-391
>                 URL: https://issues.apache.org/jira/browse/KAFKA-391
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: optimization
>             Fix For: 0.8.0
>
>         Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch,
KAFKA-391-v4.patch
>
>
> Producer response contains two arrays of error codes and offsets - the ordering in these
arrays correspond to the flattened ordering of the request arrays.
> It would be better to switch to maps in the request and response as this would make the
code clearer and more efficient (right now, linear scans are used in handling producer acks).
> We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message