kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-1779) FetchRequest.maxWait has no effect
Date Tue, 30 Dec 2014 04:15:13 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260751#comment-14260751
] 

Guozhang Wang commented on KAFKA-1779:
--------------------------------------

[~nehanarkhede] Is this really an issue? I thought the scenario described here is expected.

> FetchRequest.maxWait has no effect
> ----------------------------------
>
>                 Key: KAFKA-1779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1779
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.8.2
>            Reporter: Magnus Vojbacke
>            Priority: Minor
>
> Setting the maxWait field in a kafka.api.FetchRequest  does not appear to have any effect.
Whereas my assumption is: If I send a fetch request for messages after offset X for a partition
where there are currently no messages with offsets after X, I would expect that a Fetch request
built with the maxWait option should block on the broker side for $maxWait milliseconds for
a new message to arrive.
> Currently, the request seems to return an empty result immediately. As a result, our
client is forced to manually sleep on each fetch request that returns empty results.
> On the mailing list, it was stated that this bug should be fixed in 0.8.2, but I'm still
seeing this issue on 0.8.2-beta.
> {code}
>   // Chose a suitable topic / partition / lead broker combination
>   val host = ???
>   val port = ???
>   val topic = ???
>   val partition = ???
>   val cons = new SimpleConsumer(host, port, 50000000, 50000000, "my-id")
>     val topicAndPartition = new TopicAndPartition(topic, partition)
>     println(topicAndPartition)
>     val requestInfo = Map(topicAndPartition -> new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime,
1))
>     println(requestInfo)
>     val request = new OffsetRequest(requestInfo)
>     println(request)
>     val response: OffsetResponse = cons.getOffsetsBefore(request)
>     println("code=" + response.partitionErrorAndOffsets(topicAndPartition).error)
>     println(response)
>     val offset = response.partitionErrorAndOffsets(topicAndPartition).offsets(0)
>     val req = new FetchRequestBuilder().clientId("my-id").addFetch(topic, partition,
offset, 500000).maxWait(10000)
> // The following requests appear to return within a few hundred milliseconds of 
> // eachother, but my assumption is that maxWait 10000 milliseconds should
> // make each request block for at least 10000 milliseconds before returning an
> // empty result.
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message