kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5316) Log cleaning can increase message size and cause cleaner to crash with buffer overflow
Date Thu, 01 Jun 2017 10:03:04 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032729#comment-16032729

ASF GitHub Bot commented on KAFKA-5316:

Github user ijuma closed the pull request at:


> Log cleaning can increase message size and cause cleaner to crash with buffer overflow
> --------------------------------------------------------------------------------------
>                 Key: KAFKA-5316
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5316
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>             Fix For:
> We have observed in practice that it is possible for a compressed record set to grow
after cleaning. Since the size of the cleaner's input and output buffers are identical, this
can lead to overflow of the output buffer:
> {code}
> [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>         at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
>         at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
>         at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>         at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
>         at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {code}
> It is also then possible for a compressed message set to grow beyond the max message
size. Due to the changes in KIP-74 to alter fetch semantics, the suggestion for this case
is to allow the recompressed message set to exceed the max message size. This should be rare
in practice and won't prevent consumers from making progress.
> To handle the overflow issue, one option is to allocate a temporary buffer when filtering
in {{MemoryRecords.filterTo}} and return it in the result. As an optimization, we can resort
to this only when there is a single recompressed message set which is larger than the entire
write buffer. 

This message was sent by Atlassian JIRA

View raw message