kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-6088) Kafka Consumer slows down when reading from highly compacted topics
Date Thu, 19 Oct 2017 04:13:00 GMT
James Cheng created KAFKA-6088:
----------------------------------

             Summary: Kafka Consumer slows down when reading from highly compacted topics
                 Key: KAFKA-6088
                 URL: https://issues.apache.org/jira/browse/KAFKA-6088
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.2.1
            Reporter: James Cheng
             Fix For: 0.11.0.0


Summary of the issue
-----
We found a performance issue with the Kafka Consumer where it gets less efficient if you have
frequent gaps in offsets (which happens when there is lots of compaction on the topic).

The issue is present in 0.10.2.1 and possibly prior.

It is fixed in 0.11.0.0.

Summary of cause
-----
The fetcher code assumes that there will be no gaps in message offsets. If there are, it does
an additional round trip to the broker. For topics with large gaps in offsets, it is possible
that most calls to {{poll()}} will generate a roundtrip to the broker.

Background and details 
-----
We have a topic with roughly 8 million records. The topic is log compacted. It turns out that
most of the initial records in the topic were never overwritten, whereas in the 2nd half of
the topic we had lots of overwritten records. That means that for the first part of the topic,
there are no gaps in offsets. But in the 2nd part of the topic, there are frequent gaps in
the offsets (due to records being compacted away).

We have a consumer that starts up and reads the entire topic from beginning to end. We noticed
that the consumer would read through the first part of the topic very quickly. When it got
to the part of the topic with frequent gaps in offsets, consumption rate slowed down dramatically.
This slowdown was consistent across multiple runs.

What is happening is this:
1) A call to {{poll()}} happens. The consumer goes to the broker and returns 1MB of data (the
default of {{max.partition.fetch.bytes}}). It then returns to the caller just 500 records
(the default of {{max.poll.records}}), and keeps the rest of the data in memory to use in
future calls to {{poll()}}. 
2) Before returning the 500 records, the consumer library records the *next* offset it should
return. It does so by taking the offset of the last record, and adds 1 to it. (The offset
of the 500th message from the set, plus 1). It calls this the {{nextOffset}}
3) The application finishes processing the 500 messages, and makes another call to {{poll()}}
happens. During this call, the consumer library does a sanity check. It checks that the first
message of the set *it is about to return* has an offset that matches the value of {{nextOffset}}.
That is it checks if the 501th record has an offset that is 1 greater than the 500th record.
	a. If it matches, then it returns an additional 500 records, and increments the {{nextOffset}}
to (offset of the 1000th record, plus 1)
	b. If it doesn't match, then it throws away the remainder of the 1MB of data that it stored
in memory in step 1, and it goes back to the broker to fetch an additional 1MB of data, starting
at the offset {{nextOffset}}.

In topics have no gaps (a non-compacted topic), then the code will always hit the 3a code
path.
If the topic has gaps in offsets and the call to {{poll()}} happens to fall onto a gap, then
the code will hit code path 3b.

If the gaps are frequent, then it will frequently hit code path 3b.

The worst case scenario that can happen is if you have a large number of gaps, and you run
with {{max.poll.records=1}}. Every gap will result in a new fetch to the broker. You may possibly
end up only processing one message per fetch. Or, said another way, you will end up doing
a single fetch for every single message in the partition.


Repro
-----

We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 0.11. I've attached
the tarball with all the code and instructions. 

The repro is:
1) Create a single partition topic with log compaction turned on 
2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each message key written
twice in a row) 
3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... would be compacted
away 
4) Consume from this topic with {{max.poll.records=1}}

More concretely,

Here is the producer code:
{code}
Producer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i = 0; i < 1000000; i++) { 
    producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i),
Integer.toString(i))); 
    producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i),
Integer.toString(i))); 
} 
producer.flush(); 
producer.close();
{code}


When consuming with a 0.10.2.1 consumer, you can see this pattern (with Fetcher logs at DEBUG,
see file consumer_0.10.2/debug.log):

{code}
offset = 1, key = 0, value = 0 
22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 3 since the current position is 2 
22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 3, key = 1, value = 1 
22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 5 since the current position is 4 
22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 5, key = 2, value = 2 
22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 7 since the current position is 6 
22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 7, key = 3, value = 3 
22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 9 since the current position is 8 
22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 9, key = 4, value = 4 
22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 11 since the current position is 10 
22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 11, key = 5, value = 5 
22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 13 since the current position is 12 
22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 13, key = 6, value = 6 
22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for
compacted-0 at offset 15 since the current position is 14 
22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions
[compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
{code}


When consuming with a 0.11.0.1 consumer ,you can see the following pattern: (see file consumer_0.11/debug.log):

{code}
offset = 1, key = 0, value = 0 
offset = 3, key = 1, value = 1 
offset = 5, key = 2, value = 2 
offset = 7, key = 3, value = 3 
offset = 9, key = 4, value = 4 
offset = 11, key = 5, value = 5 
offset = 13, key = 6, value = 6 
offset = 15, key = 7, value = 7 
offset = 17, key = 8, value = 8 
offset = 19, key = 9, value = 9 
offset = 21, key = 10, value = 10 
{code}

>From looking at the github history, it appears it was fixed in https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0

Specifically, this line 
https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930

Was replaced by this line: 
https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933

Mitigation
-----
This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you will not be affected
by the problem.

If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by increasing the
value of {{max.poll.records}}. This works because check happens on each call to {{poll()}},
and increasing the value of {{max.poll.records}} will reduce the number of calls to {{poll()}}.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message