kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ismael Juma (JIRA)" <j...@apache.org>
Subject [jira] [Reopened] (KAFKA-6088) Kafka Consumer slows down when reading from highly compacted topics
Date Thu, 19 Oct 2017 06:22:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ismael Juma reopened KAFKA-6088:
--------------------------------

> Kafka Consumer slows down when reading from highly compacted topics
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6088
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>             Fix For: 0.11.0.0
>
>
> Summary of the issue
> -----
> We found a performance issue with the Kafka Consumer where it gets less efficient if
you have frequent gaps in offsets (which happens when there is lots of compaction on the topic).
> The issue is present in 0.10.2.1 and possibly prior.
> It is fixed in 0.11.0.0.
> Summary of cause
> -----
> The fetcher code assumes that there will be no gaps in message offsets. If there are,
it does an additional round trip to the broker. For topics with large gaps in offsets, it
is possible that most calls to {{poll()}} will generate a roundtrip to the broker.
> Background and details 
> -----
> We have a topic with roughly 8 million records. The topic is log compacted. It turns
out that most of the initial records in the topic were never overwritten, whereas in the 2nd
half of the topic we had lots of overwritten records. That means that for the first part of
the topic, there are no gaps in offsets. But in the 2nd part of the topic, there are frequent
gaps in the offsets (due to records being compacted away).
> We have a consumer that starts up and reads the entire topic from beginning to end. We
noticed that the consumer would read through the first part of the topic very quickly. When
it got to the part of the topic with frequent gaps in offsets, consumption rate slowed down
dramatically. This slowdown was consistent across multiple runs.
> What is happening is this:
> 1) A call to {{poll()}} happens. The consumer goes to the broker and returns 1MB of data
(the default of {{max.partition.fetch.bytes}}). It then returns to the caller just 500 records
(the default of {{max.poll.records}}), and keeps the rest of the data in memory to use in
future calls to {{poll()}}. 
> 2) Before returning the 500 records, the consumer library records the *next* offset it
should return. It does so by taking the offset of the last record, and adds 1 to it. (The
offset of the 500th message from the set, plus 1). It calls this the {{nextOffset}}
> 3) The application finishes processing the 500 messages, and makes another call to {{poll()}}
happens. During this call, the consumer library does a sanity check. It checks that the first
message of the set *it is about to return* has an offset that matches the value of {{nextOffset}}.
That is it checks if the 501th record has an offset that is 1 greater than the 500th record.
> 	a. If it matches, then it returns an additional 500 records, and increments the {{nextOffset}}
to (offset of the 1000th record, plus 1)
> 	b. If it doesn't match, then it throws away the remainder of the 1MB of data that it
stored in memory in step 1, and it goes back to the broker to fetch an additional 1MB of data,
starting at the offset {{nextOffset}}.
> In topics have no gaps (a non-compacted topic), then the code will always hit the 3a
code path.
> If the topic has gaps in offsets and the call to {{poll()}} happens to fall onto a gap,
then the code will hit code path 3b.
> If the gaps are frequent, then it will frequently hit code path 3b.
> The worst case scenario that can happen is if you have a large number of gaps, and you
run with {{max.poll.records=1}}. Every gap will result in a new fetch to the broker. You may
possibly end up only processing one message per fetch. Or, said another way, you will end
up doing a single fetch for every single message in the partition.
> Repro
> -----
> We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 0.11. I've
attached the tarball with all the code and instructions. 
> The repro is:
> 1) Create a single partition topic with log compaction turned on 
> 2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each message key
written twice in a row) 
> 3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... would be
compacted away 
> 4) Consume from this topic with {{max.poll.records=1}}
> More concretely,
> Here is the producer code:
> {code}
> Producer<String, String> producer = new KafkaProducer<String, String>(props);

> for (int i = 0; i < 1000000; i++) { 
>     producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i),
Integer.toString(i))); 
>     producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i),
Integer.toString(i))); 
> } 
> producer.flush(); 
> producer.close();
> {code}
> When consuming with a 0.10.2.1 consumer, you can see this pattern (with Fetcher logs
at DEBUG, see file consumer_0.10.2/debug.log):
> {code}
> offset = 1, key = 0, value = 0 
> 22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 3 since the current position is 2 
> 22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 3, key = 1, value = 1 
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 5 since the current position is 4 
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 5, key = 2, value = 2 
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 7 since the current position is 6 
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 7, key = 3, value = 3 
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 9 since the current position is 8 
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 9, key = 4, value = 4 
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 11 since the current position is 10 
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 11, key = 5, value = 5 
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 13 since the current position is 12 
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 13, key = 6, value = 6 
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records
for compacted-0 at offset 15 since the current position is 14 
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> {code}
> When consuming with a 0.11.0.1 consumer ,you can see the following pattern: (see file
consumer_0.11/debug.log): 
> {code}
> offset = 1, key = 0, value = 0 
> offset = 3, key = 1, value = 1 
> offset = 5, key = 2, value = 2 
> offset = 7, key = 3, value = 3 
> offset = 9, key = 4, value = 4 
> offset = 11, key = 5, value = 5 
> offset = 13, key = 6, value = 6 
> offset = 15, key = 7, value = 7 
> offset = 17, key = 8, value = 8 
> offset = 19, key = 9, value = 9 
> offset = 21, key = 10, value = 10 
> {code}
> From looking at the github history, it appears it was fixed in https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0
> Specifically, this line 
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930
> Was replaced by this line: 
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933
> Mitigation
> -----
> This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you will not
be affected by the problem.
> If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by increasing
the value of {{max.poll.records}}. This works because check happens on each call to {{poll()}},
and increasing the value of {{max.poll.records}} will reduce the number of calls to {{poll()}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message