incubator-kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jay Kreps (JIRA)" <>
Subject [jira] [Commented] (KAFKA-631) Implement log compaction
Date Mon, 26 Nov 2012 22:32:59 GMT


Jay Kreps commented on KAFKA-631:

Here is a specific proposal:

We will retain the existing settings that retain segments based on bytes and time, with data
prior to these limits left unmolested. We will introduce a new setting for each topic "cleanup.policy"={delete,
dedupe}. cleanup.policy=delete will correspond to the current behavior. cleanup.policy=dedupe
will correspond to the new behavior described in this JIRA. As now, data that falls inside
the retention window will not be touched, but data that is outside that window will be deduplicated
rather than deleted. It is intended that this be a per-topic setting specified at topic creation
time. As a short-cut for the purpose of this ticket I will just add a configuration map setting
the policy in the way we have for other topic-level settings, these can all be refactored
into something set in the create/alter topic command as a follow-up item.

Topics getting dedupe will be processed by a pool of background "cleaner" threads. These threads
will periodically recopy old segment files removing obsolete messages and swapping in these
new deduplicated files in place of the old segments. These sparse files should already be
well-supported by the logical and sparse offset work in 0.8.

Here are the specific changes intended:
- Add a few new configs: 
   - topic.cleanup.policy={delete,dedupe} // A map of cleanup policies, defaults to delete
   - cleaner.thread.pool.size=# // The number of background threads to use for cleaning 
   - cleaner.buffer.size.bytes=# // The maximum amount of heap memory per cleaner thread that
can be used for log deduplication
   - cleaner.max.{read,write}.throughput=# // The maximum bytes per second the cleaner can
read or write
- Add a new method Log.replaceSegments() that replaces one or more old segments with a new
segment while holding the log lock
- Implement a background cleaner thread that does the recopying. This thread will be owned
and maintained by LogManager
- Add a new file per data directory called cleaner-metadata that maintains the cleaned section
of the logs in that directory that have dedupe enabled. This allows the cleaner to restart
cleaning from the same point upon restart.

The cleaning algorithm for a single log will work as follows:
1. Scan the head of the log (i.e. all messages since the last cleaning) and create a Map of
key => offset for messages in the head of the log. If the cleaner buffer is too small to
scan the full head of the log then just scan whatever fits going from oldest to newest.
2. Sequentially clean segments from oldest to newest.
3. To clean a segment, first create a new empty copy of the segment file with a temp name.
Check each message in the original segment. If it is contained in the map with a higher offset,
ignore it; otherwise recopy it to the new temp segment. When the segment is complete swap
in the new file and delete the old. 

The threads will iterate over the logs and clean them periodically (not sure the right frequency

Some Nuances:
1. The above tends to lead to smaller and smaller segment files in the tail of the log as
records are overwritten. To avoid this we will combine files; that is, we will always collect
the largest set of files that together are smaller than the max segment size into a single
segment. Obviously this will be based on the starting sizes, so the resulting segment will
likely still be smaller than the resulting segment.
2. The recopying procedure depends on the property that logs are immutable. However our logs
are only mostly immutable. It is possible to truncate a log to any segment. It is important
that the cleaner respect this and not have a race condition with potential truncate operations.
But likewise we can't lock for the duration of the cleaning as it may be quite slow. To work
around this I will add a generation counter to the log. Each truncate operation will increment
this counter. The cleaner will record the generation before it begins cleaning and the swap
operation that swaps in the new, cleaned segment will only occur if the generations match
(i.e. if no truncates happened in that segment during cleaning). This will potentially result
in some wasted cleaner work when truncatations collide with cleanings, but since truncates
are rare and truncates deep enough into the log to interact with cleaning very rare this should
almost never happen.
> Implement log compaction
> ------------------------
>                 Key: KAFKA-631
>                 URL:
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
> Currently Kafka has only one way to bound the space of the log, namely by deleting old
segments. The policy that controls which segments are deleted can be configured based either
on the number of bytes to retain or the age of the messages. This makes sense for event or
log data which has no notion of primary key. However lots of data has a primary key and consists
of updates by primary key. For this data it would be nice to be able to ensure that the log
contained at least the last version of every key.
> As an example, say that the Kafka topic contains a sequence of User Account messages,
each capturing the current state of a given user account. Rather than simply discarding old
segments, since the set of user accounts is finite, it might make more sense to delete individual
records that have been made obsolete by a more recent update for the same key. This would
ensure that the topic contained at least the current state of each record.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message