kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Manoj Ramakrishnan (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation
Date Fri, 23 Feb 2018 18:10:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374767#comment-16374767
] 

Manoj Ramakrishnan commented on KAFKA-4753:
-------------------------------------------

Hi @Jason or others, any resolution which is planned on this one or any workaround we attempt?
We are seeing lots of restabilization errors after adding one more consumer app in our production
load

> KafkaConsumer susceptible to FetchResponse starvation
> -----------------------------------------------------
>
>                 Key: KAFKA-4753
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4753
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
>            Assignee: Onur Karaman
>            Priority: Major
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails to fully
form FetchResponses within the request timeout from a subset of the brokers its fetching from
while FetchResponses from the other brokers can get fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from some brokers
hurts the progress of fetching from other brokers to the point of repeatedly hitting a request
timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed across brokers,
causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, causing uneven
FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers is skewed,
causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in prod. They
manually assigned partitions such that a few brokers led most of the partitions while other
brokers only led a single partition. When NetworkClient sends out FetchRequests to different
brokers in parallel with an uneven partition distribution, FetchResponses from brokers who
lead more partitions will contain more data than FetchResponses from brokers who lead few
partitions. This means the small FetchResponses will get fully formed quicker than larger
FetchResponses. When the application eventually consumes a smaller fully formed FetchResponses,
the NetworkClient will send out a new FetchRequest to the lightly-loaded broker. Their response
will again come back quickly while only marginal progress has been made on the larger FetchResponse.
Repeat this process several times and your application will have potentially processed many
smaller FetchResponses while the larger FetchResponse made minimal progress and is forced
to timeout, causing the large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to something
like 1 MB. This is the solution I short-term solution I suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB to something
like 100 KB. This solution wasn't advised as it could impact broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition leadership
(partitions were already being led by their preferred leaders).
> 4. bump up their request timeout. It was set to open-source's former default of 40 seconds.
> Contributing factors:
> 1. uneven FetchResponse sizes across brokers.
> 2. processing time of the polled ConsumerRecords.
> 3. "max.poll.records" increases the number of polls needed to consume a FetchResponse,
making constant-time overhead per poll magnified.
> 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to ConsumerNetworkClient.poll.
> 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and ConsumerNetworkClient.poll
can return before the poll timeout as soon as a single channel is selected.
> 6. NetworkClient.poll is solely driven by the user thread with manual partition assignment.
> So far I've only locally reproduced starvation scenario 1 and haven't even attempted
the other scenarios. Preventing the bypass of ConsumerNetworkClient.poll (contributing factor
3) mitigates the issue, but it seems starvation would still be possible.
> How to reproduce starvation scenario 1:
> 1. startup zookeeper
> 2. startup two brokers
> 3. create a topic t0 with two partitions led by broker 0 and create a topic t1 with a
single partition led by broker 1
> {code}
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 --replica-assignment
0,0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 --replica-assignment
1
> {code}
> 4. Produce a lot of data into these topics
> {code}
> > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 --record-size
100 --throughput 100000 --producer-props bootstrap.servers=localhost:9090,localhost:9091
> > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 --record-size
100 --throughput 100000 --producer-props bootstrap.servers=localhost:9090,localhost:9091
> {code}
> 5. startup a consumer that consumes these 3 partitions with TRACE level NetworkClient
logging
> {code}
> > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.StarvedFetchResponseTest
10000 3000 65536
> {code}
> The config/tools-log4j.properties:
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> log4j.rootLogger=WARN, stderr
> log4j.appender.stderr=org.apache.log4j.ConsoleAppender
> log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
> log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
> log4j.appender.stderr.Target=System.err
> log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
> log4j.additivity.org.apache.kafka.clients.NetworkClient=false
> {code}
> The consumer code:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
>  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
>  * to You under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the
>  * License. You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
>  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the
>  * specific language governing permissions and limitations under the License.
>  */
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Set;
> public class StarvedFetchResponseTest {
>     public static void main(String[] args) throws InterruptedException {
>         long pollTimeout = Long.valueOf(args[0]);
>         long sleepDuration = Long.valueOf(args[1]);
>         String receiveBufferSize = args[2];
>         Properties props = new Properties();
>         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090,localhost:9091");
>         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "fetch-response-starvation");
>         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
>         props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
>         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>         props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBufferSize);
>         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props);
>         List<TopicPartition> partitions = new ArrayList<>();
>         for (int i = 0; i < 2; i++) {
>             partitions.add(new TopicPartition("t0", i));
>         }
>         partitions.add(new TopicPartition("t1", 0));
>         kafkaConsumer.assign(partitions);
>         kafkaConsumer.seekToBeginning(partitions);
>         while (true) {
>             ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(pollTimeout);
>             System.out.println(recordsPerTopic(records));
>             Thread.sleep(sleepDuration);
>         }
>     }
>     private static Map<TopicPartition, Integer> recordsPerTopic(ConsumerRecords<byte[],
byte[]> records) {
>         Map<TopicPartition, Integer> result = new HashMap<>();
>         Set<TopicPartition> partitions = records.partitions();
>         for (TopicPartition partition : partitions) {
>             if (!result.containsKey(partition)) {
>                 result.put(partition, 0);
>             }
>             result.put(partition, result.get(partition) + records.records(partition).size());
>         }
>         return result;
>     }
> }
> {code}
> After running it for 30 minutes, around 33 FetchResponses from broker 1 were served to
the application while the many partially formed FetchResponses from broker 0 were cancelled
due to a disconnect from request timeout. It seems that were was only one successful FetchResponse
from broker 0 served to the application during this time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message