kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ismael Juma (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4405) Kafka consumer improperly send prefetch request
Date Tue, 29 Nov 2016 16:03:58 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705691#comment-15705691

Ismael Juma commented on KAFKA-4405:

[~ysysberserk], are you really seeing the behaviour you have described here? Fetcher should
only try to prefetch if we don't have records for that partition, see the code below (`fetchablePartitions`
in particular):

    private List<TopicPartition> fetchablePartitions() {
        List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
        for (CompletedFetch completedFetch : completedFetches)
        return fetchable;

     * Create fetch requests for all nodes for which we have assigned partitions
     * that have no existing requests in flight.
    private Map<Node, FetchRequest> createFetchRequests() {
        // create the fetch info
        Cluster cluster = metadata.fetch();
        Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable
= new LinkedHashMap<>();
        for (TopicPartition partition : fetchablePartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
            } else if (this.client.pendingRequestCount(node) == 0) {
                // if there is a leader and no in-flight requests, issue a new fetch
                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                if (fetch == null) {
                    fetch = new LinkedHashMap<>();
                    fetchable.put(node, fetch);

                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
                log.trace("Added fetch request for partition {} at offset {}", partition,
            } else {
                log.trace("Skipping fetch for partition {} because there is an in-flight request
to {}", partition, node);

        // create the fetches
        Map<Node, FetchRequest> requests = new HashMap<>();
        for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>>
entry : fetchable.entrySet()) {
            Node node = entry.getKey();
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes,
            requests.put(node, fetch);
        return requests;

> Kafka consumer improperly send prefetch request
> -----------------------------------------------
>                 Key: KAFKA-4405
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4405
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions:
>            Reporter: ysysberserk
> Now kafka consumer has added max.poll.records to limit the count of messages return by
> According to KIP-41, to implement  max.poll.records, the prefetch request should only
be sent when the total number of retained records is less than max.poll.records.
> But in the code of , the consumer will send a prefetch request if it retained
any records and never check if total number of retained records is less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message fetched , the
poll() loop will send a lot of requests than expected and will have more and more records
fetched and stored in memory before they can be consumed.
> So before sending a  prefetch request , the consumer must check if total number of retained
records is less than max.poll.records.

This message was sent by Atlassian JIRA

View raw message