incubator-kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Swapnil Ghike (JIRA)" <>
Subject [jira] [Commented] (KAFKA-521) Refactor Log subsystem
Date Tue, 27 Nov 2012 06:03:58 GMT


Swapnil Ghike commented on KAFKA-521:

Overall, a lot of things have become much more readable and easier to understand. I have a
few comments:

1. Log.analyzeAndValidateMessageSet(): We can probably replace these three statements  
 val messageCodec = m.compressionCodec
  if(messageCodec != NoCompressionCodec)
      codec = messageCodec

with  codec = m.compressionCodec

2. There are a few unused imports in Log, LogManager.

3. Why not touch() the last segment in Log.deleteOldSegments() if  the last segment is empty?
Not deleting the empty last segment is equivalent to deleting the segment and creating a new
segment. If the last segment expires based on retention time and remains empty, and its last
modified time is not reset in deleteOldSegments(), then it will eventually be able to hold
only one message set.

4. In reference to the comment "do not lock around actual file deletion, it isn't O(1) on
many filesystems" in Log.deleteOldSegments(): 

Log.truncateTo () calls deletable.foreach(_.delete()) and truncateFullyAndStartAt() calls
segmentsToDelete.foreach(_.delete()) inside a synchronized block. It should be possible to
move the file deletion out of the synchronized block in the second case at least, which will
help if truncateFullyAndStartAt() is called by ReplicaFetcherThread.handleOffsetOutOfRange().

I was also not quite sure about what you meant by "Fix a bug in Log.truncateTo--we need to
delete the old segments before creating the new segment to ensure we don't delete the new

5. Log.roll() : 
val prev = segments.put(segment.baseOffset, segment)
 if(prev != null)
   throw new KafkaException(%s)

replaces the old segment and then checks if there was already a segment at that offset. Not
sure if it matters whether the new or the old segment stays in the map in case a KafkaException
is thrown.
> Refactor Log subsystem
> ----------------------
>                 Key: KAFKA-521
>                 URL:
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jay Kreps
>         Attachments: KAFKA-521-v1.patch, KAFKA-521-v2.patch, KAFKA-521-v3.patch
> There are a number of items it would be nice to cleanup in the log subsystem:
> 1. Misc. funky apis in Log and LogManager
> 2. Much of the functionality in Log should move into LogSegment along with corresponding
> 3. We should remove SegmentList and instead use a ConcurrentSkipListMap
> The general idea of the refactoring fall into two categories. First, improve and thoroughly
document the public APIs. Second, have a clear delineation of responsibility between the various
> 1. LogManager is responsible for the creation and deletion of logs as well as the retention
of data in log segments. LogManager is the only layer aware of partitions and topics. LogManager
consists of a bunch of individual Log instances and interacts with them only through their
public API (mostly true today).
> 2. Log represents a totally ordered log. Log is responsible for reading, appending, and
truncating the log. A log consists of a bunch of LogSegments. Currently much of the functionality
in Log should move into LogSegment with Log interacting only through the Log interface. Currently
we reach around this a lot to call into FileMessageSet and OffsetIndex.
> 3. A LogSegment consists of an OffsetIndex and a FileMessageSet. It supports largely
the same APIs as Log, but now localized to a single segment.
> This cleanup will simplify testing and debugging because it will make the responsibilities
and guarantees at each layer more clear.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message