Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7BED3200D2B for ; Thu, 19 Oct 2017 06:27:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 755DB160BEB; Thu, 19 Oct 2017 04:27:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 936BB160BEA for ; Thu, 19 Oct 2017 06:27:08 +0200 (CEST) Received: (qmail 22207 invoked by uid 500); 19 Oct 2017 04:27:07 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 22195 invoked by uid 99); 19 Oct 2017 04:27:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 04:27:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id CF35E18070D for ; Thu, 19 Oct 2017 04:27:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.201 X-Spam-Level: X-Spam-Status: No, score=-99.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id SBVZcIGJo8IQ for ; Thu, 19 Oct 2017 04:27:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 522795F3DE for ; Thu, 19 Oct 2017 04:27:04 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D544BE04F4 for ; Thu, 19 Oct 2017 04:27:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id DFD6C21EE2 for ; Thu, 19 Oct 2017 04:27:00 +0000 (UTC) Date: Thu, 19 Oct 2017 04:27:00 +0000 (UTC) From: "James Cheng (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Resolved] (KAFKA-6088) Kafka Consumer slows down when reading from highly compacted topics MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 19 Oct 2017 04:27:09 -0000 [ https://issues.apache.org/jira/browse/KAFKA-6088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Cheng resolved KAFKA-6088. -------------------------------- Resolution: Won't Fix It is fixed in kafka client 0.11.0.0, and 0.11.0.0 clients can be used against brokers as far back as 0.10.0.0. So if anyone is affected, they can update their kafka clients in order to get the fix. So, we won't issue a patch fix to older releases. > Kafka Consumer slows down when reading from highly compacted topics > ------------------------------------------------------------------- > > Key: KAFKA-6088 > URL: https://issues.apache.org/jira/browse/KAFKA-6088 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.2.1 > Reporter: James Cheng > Fix For: 0.11.0.0 > > > Summary of the issue > ----- > We found a performance issue with the Kafka Consumer where it gets less efficient if you have frequent gaps in offsets (which happens when there is lots of compaction on the topic). > The issue is present in 0.10.2.1 and possibly prior. > It is fixed in 0.11.0.0. > Summary of cause > ----- > The fetcher code assumes that there will be no gaps in message offsets. If there are, it does an additional round trip to the broker. For topics with large gaps in offsets, it is possible that most calls to {{poll()}} will generate a roundtrip to the broker. > Background and details > ----- > We have a topic with roughly 8 million records. The topic is log compacted. It turns out that most of the initial records in the topic were never overwritten, whereas in the 2nd half of the topic we had lots of overwritten records. That means that for the first part of the topic, there are no gaps in offsets. But in the 2nd part of the topic, there are frequent gaps in the offsets (due to records being compacted away). > We have a consumer that starts up and reads the entire topic from beginning to end. We noticed that the consumer would read through the first part of the topic very quickly. When it got to the part of the topic with frequent gaps in offsets, consumption rate slowed down dramatically. This slowdown was consistent across multiple runs. > What is happening is this: > 1) A call to {{poll()}} happens. The consumer goes to the broker and returns 1MB of data (the default of {{max.partition.fetch.bytes}}). It then returns to the caller just 500 records (the default of {{max.poll.records}}), and keeps the rest of the data in memory to use in future calls to {{poll()}}. > 2) Before returning the 500 records, the consumer library records the *next* offset it should return. It does so by taking the offset of the last record, and adds 1 to it. (The offset of the 500th message from the set, plus 1). It calls this the {{nextOffset}} > 3) The application finishes processing the 500 messages, and makes another call to {{poll()}} happens. During this call, the consumer library does a sanity check. It checks that the first message of the set *it is about to return* has an offset that matches the value of {{nextOffset}}. That is it checks if the 501th record has an offset that is 1 greater than the 500th record. > a. If it matches, then it returns an additional 500 records, and increments the {{nextOffset}} to (offset of the 1000th record, plus 1) > b. If it doesn't match, then it throws away the remainder of the 1MB of data that it stored in memory in step 1, and it goes back to the broker to fetch an additional 1MB of data, starting at the offset {{nextOffset}}. > In topics have no gaps (a non-compacted topic), then the code will always hit the 3a code path. > If the topic has gaps in offsets and the call to {{poll()}} happens to fall onto a gap, then the code will hit code path 3b. > If the gaps are frequent, then it will frequently hit code path 3b. > The worst case scenario that can happen is if you have a large number of gaps, and you run with {{max.poll.records=1}}. Every gap will result in a new fetch to the broker. You may possibly end up only processing one message per fetch. Or, said another way, you will end up doing a single fetch for every single message in the partition. > Repro > ----- > We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 0.11. I've attached the tarball with all the code and instructions. > The repro is: > 1) Create a single partition topic with log compaction turned on > 2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each message key written twice in a row) > 3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... would be compacted away > 4) Consume from this topic with {{max.poll.records=1}} > More concretely, > Here is the producer code: > {code} > Producer producer = new KafkaProducer(props); > for (int i = 0; i < 1000000; i++) { > producer.send(new ProducerRecord("compacted", Integer.toString(i), Integer.toString(i))); > producer.send(new ProducerRecord("compacted", Integer.toString(i), Integer.toString(i))); > } > producer.flush(); > producer.close(); > {code} > When consuming with a 0.10.2.1 consumer, you can see this pattern (with Fetcher logs at DEBUG, see file consumer_0.10.2/debug.log): > {code} > offset = 1, key = 0, value = 0 > 22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 3 since the current position is 2 > 22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 3, key = 1, value = 1 > 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 5 since the current position is 4 > 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 5, key = 2, value = 2 > 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 7 since the current position is 6 > 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 7, key = 3, value = 3 > 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 9 since the current position is 8 > 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 9, key = 4, value = 4 > 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 11 since the current position is 10 > 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 11, key = 5, value = 5 > 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 13 since the current position is 12 > 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 13, key = 6, value = 6 > 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 15 since the current position is 14 > 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > {code} > When consuming with a 0.11.0.1 consumer ,you can see the following pattern: (see file consumer_0.11/debug.log): > {code} > offset = 1, key = 0, value = 0 > offset = 3, key = 1, value = 1 > offset = 5, key = 2, value = 2 > offset = 7, key = 3, value = 3 > offset = 9, key = 4, value = 4 > offset = 11, key = 5, value = 5 > offset = 13, key = 6, value = 6 > offset = 15, key = 7, value = 7 > offset = 17, key = 8, value = 8 > offset = 19, key = 9, value = 9 > offset = 21, key = 10, value = 10 > {code} > From looking at the github history, it appears it was fixed in https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0 > Specifically, this line > https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930 > Was replaced by this line: > https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933 > Mitigation > ----- > This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you will not be affected by the problem. > If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by increasing the value of {{max.poll.records}}. This works because check happens on each call to {{poll()}}, and increasing the value of {{max.poll.records}} will reduce the number of calls to {{poll()}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)