kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kyle Ambroff (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
Date Mon, 25 Sep 2017 16:31:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179272#comment-16179272
] 

Kyle Ambroff commented on KAFKA-5297:
-------------------------------------

This is definitely still a problem for us. We've applied the patch in the pull request as
an internal hotfix, but that is not even close to an ideal solution.

We've been short on time and haven't been able to dedicate a lot of cycles to this, but doing
the work to truncate on rotation would be ideal.

> Broker can take a long time to shut down if there are many active log segments
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-5297
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5297
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Kyle Ambroff
>            Priority: Minor
>         Attachments: flame graph of broker during shut down.png, LogSegmentBenchmark.java,
shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our cluster restart
times were quite a bit longer. In some cases it was taking four times as long as expected
to do a rolling restart of every broker in the cluster. This meant that doing a deploy to
one of our Kafka clusters went from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a sampling profiler.
It turns out that it isn't unusual for us to have a broker sit in kafka.log.Log#close for
up to 30 minutes if it has been running for several weeks. There are just so many active log
segments that it just takes a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack samples collected
during shutdown of a broker that took about 30 minutes total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where every index
and timeindex file is truncated to the size of the number of entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, which is
used to make any final updates to the timeindex file. This is something that can be cached.
For a broker that's been running for a long time the bulk of these indexes are not likely
to be in the page cache anymore. We cache the largestTimestamp and offsetOfLargestTimestamp
in LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the broker
shutdown time has increased so dramatically. The extra index plus the extra reads have increased
the amount of work performed by kafka.log.Log#close by about 4x (in terms of system calls
and potential page faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the timeindex
if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to lseek() system
call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to lseek() system
call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for that log
segment.
> ## Truncate the log segment if it doesn't have enough messages written to fill up the
whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a lseek() system
call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, it appears
to do quite a bit of extra work to avoid keeping an instance of RandomAccessFile around. It
has to reopen the file, truncate, mmap(), potentially perform an additional disk seek, all
before imediately closing the file.
> You wouldn't think this would amount to much, but I put together a benchmark using jmh
to measure the difference between the current code and a new implementation that didn't have
to recreate the page mapping during resize(), and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>   (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
>   CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)
> # Run complete. Total time: 00:03:37
> Benchmark                                     Mode  Cnt     Score    Error  Units
> LogSegmentBenchmark.currentImplementation    thrpt   60  2063.386 ± 81.758  ops/s
> LogSegmentBenchmark.optimizedImplementation  thrpt   60  3497.354 ± 31.575  ops/s
> {noformat}
> I ran this benchmark on a Linux workstation. It just measures the throughput of Log#close
after 20 segments have been created. Not having to reopen the file amounts to a 70% increase
in throughput.
> I think there are two totally valid approaches to making this better:
> * Premptively truncate index files when log rotation happens. Once a log is rotated,
jobs could be added to an ExecutorService which truncates indexes so that they don't all have
to be truncated on shutdown. The new shutdown code would enqueue all remaining active indexes
and then drain the queue.
> * Alternatively we could just add a RandomAccessFile instance variable to AbstractIndex
so that it doesn't have to recreate the page mapping on resize(). This means an extra file
handle for each segment but that doesn't seem like a big deal to me.
> No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message