kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1406036 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/Log.scala main/scala/kafka/log/LogSegment.scala main/scala/kafka/log/OffsetIndex.scala test/scala/unit/kafka/log/LogTest.scala
Date Tue, 06 Nov 2012 03:43:15 GMT
Author: nehanarkhede
Date: Tue Nov  6 03:43:15 2012
New Revision: 1406036

URL: http://svn.apache.org/viewvc?rev=1406036&view=rev
Log:
KAFKA-593 Empty log index file created when it shouldn't be empty; Patched by Yang Ye; reviewed
by Jun and Jay

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1406036&r1=1406035&r2=1406036&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Nov  6 03:43:15
2012
@@ -174,6 +174,9 @@ private[kafka] class Log(val dir: File, 
         }
       })
 
+      // reset the index size of the last (current active) log segment to its maximum value
+      logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
+
       // run recovery on the last segment if necessary
       if(needsRecovery)
         recoverSegment(logSegments.get(logSegments.size - 1))
@@ -444,9 +447,14 @@ private[kafka] class Log(val dir: File, 
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.trimToSize()
+      case Some(segment) => segment.index.trimToValidSize()
       case None => 
     }
+
+    val segmentsView = segments.view
+    if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
+      throw new KafkaException("Trying to roll a new log segment for topic partition %s with
start offset %d while it already exsits".format(dir.getName, newOffset))
+
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
                                  indexIntervalBytes = indexIntervalBytes, 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1406036&r1=1406035&r2=1406036&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Tue Nov  6
03:43:15 2012
@@ -120,7 +120,9 @@ class LogSegment(val messageSet: FileMes
     val mapping = translateOffset(offset)
     if(mapping == null)
       return
-    index.truncateTo(offset)  
+    index.truncateTo(offset)
+    // after truncation, reset and allocate more space for the (new currently  active) index
+    index.resize(index.maxIndexSize)
     messageSet.truncateTo(mapping.position)
     if (messageSet.sizeInBytes == 0)
       firstAppendTime = None

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1406036&r1=1406035&r2=1406036&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Tue Nov 
6 03:43:15 2012
@@ -49,7 +49,7 @@ import kafka.utils._
  * All external APIs translate from relative offsets to full offsets, so users of this class
do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging
{
+class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends
Logging {
   
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
@@ -85,7 +85,7 @@ class OffsetIndex(val file: File, val ba
     }
   
   /* the maximum number of entries this index can hold */
-  val maxEntries = mmap.limit / 8
+  def maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
   private var size = new AtomicInteger(mmap.position / 8)
@@ -227,14 +227,22 @@ class OffsetIndex(val file: File, val ba
    * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes
from
    * the file.
    */
-  def trimToSize() {
+  def trimToValidSize() = resize(entries * 8)
+
+  /**
+   * Reset the size of the memory map and the underneath file. This is used in two kinds
of cases: (1) in
+   * trimToValidSize() which is called at closing the segment or new segment being rolled;
(2) at
+   * loading segments from disk or truncating back to an old segment where a new log segment
became active;
+   * we want to reset the index size to maximum index size to avoid rolling new segment.
+   */
+  def resize(newSize: Int) {
     this synchronized {
       flush()
       val raf = new RandomAccessFile(file, "rws")
+      val roundedNewSize = roundToExactMultiple(newSize, 8)
       try {
-        val newLength = entries * 8
-        raf.setLength(newLength)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
+        raf.setLength(roundedNewSize)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
       } finally {
         Utils.swallow(raf.close())
       }
@@ -262,7 +270,7 @@ class OffsetIndex(val file: File, val ba
   
   /** Close the index */
   def close() {
-    trimToSize()
+    trimToValidSize()
   }
   
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1406036&r1=1406035&r2=1406036&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Nov
 6 03:43:15 2012
@@ -342,7 +342,31 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", 0, log.logEndOffset)
     assertEquals("Should change log size", log.size, 0)
   }
-  
+
+  @Test
+  def testIndexResizingAtTruncation() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    val msgPerSeg = 10
+    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+    for (i<- 1 to msgPerSeg)
+      log.append(set)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+    for (i<- 1 to msgPerSeg)
+      log.append(set)
+    assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
+    assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries)
+    log.truncateTo(0)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8,
log.segments.view(0).index.maxEntries)
+    for (i<- 1 to msgPerSeg)
+      log.append(set)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+  }
+
+
   @Test
   def testAppendWithoutOffsetAssignment() {
     for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {



Mime
View raw message