kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1403859 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/OffsetIndex.scala test/scala/unit/kafka/log/LogSegmentTest.scala test/scala/unit/kafka/log/OffsetIndexTest.scala
Date Tue, 30 Oct 2012 20:18:46 GMT
Author: jkreps
Date: Tue Oct 30 20:18:45 2012
New Revision: 1403859

URL: http://svn.apache.org/viewvc?rev=1403859&view=rev
Log:
KAFKA-588 Fix bug in OffsetIndex.truncateTo. Reviewed by Neha.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1403859&r1=1403858&r2=1403859&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Tue Oct 30
20:18:45 2012
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic._
 import kafka.utils._
 
 /**
- * An index that maps logical offsets to physical file locations for a particular log segment.
This index may be sparse:
+ * An index that maps offsets to physical file locations for a particular log segment. This
index may be sparse:
  * that is it may not hold an entry for all messages in the log.
  * 
  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of
8-byte entries.
@@ -94,20 +94,20 @@ class OffsetIndex(val file: File, val ba
   var lastOffset = readLastOffset()
   
   /**
-   * The last logical offset written to the index
+   * The last offset written to the index
    */
   private def readLastOffset(): Long = {
     val offset = 
       size.get match {
         case 0 => 0
-        case s => logical(this.mmap, s-1)
+        case s => relativeOffset(this.mmap, s-1)
       }
     baseOffset + offset
   }
 
   /**
    * Find the largest offset less than or equal to the given targetOffset 
-   * and return a pair holding this logical offset and it's corresponding physical file position.
+   * and return a pair holding this offset and it's corresponding physical file position.
    * If the target offset is smaller than the least entry in the index (or the index is empty),
    * the pair (baseOffset, 0) is returned.
    */
@@ -117,7 +117,7 @@ class OffsetIndex(val file: File, val ba
     if(slot == -1)
       OffsetPosition(baseOffset, 0)
     else
-      OffsetPosition(baseOffset + logical(idx, slot), physical(idx, slot))
+      OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
   }
   
   /**
@@ -127,14 +127,14 @@ class OffsetIndex(val file: File, val ba
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
     // we only store the difference from the baseoffset so calculate that
-    val relativeOffset = targetOffset - baseOffset
+    val relOffset = targetOffset - baseOffset
     
     // check if the index is empty
     if(entries == 0)
       return -1
     
     // check if the target offset is smaller than the least offset
-    if(logical(idx, 0) > relativeOffset)
+    if(relativeOffset(idx, 0) > relOffset)
       return -1
       
     // binary search for the entry
@@ -142,10 +142,10 @@ class OffsetIndex(val file: File, val ba
     var hi = entries-1
     while(lo < hi) {
       val mid = ceil(hi/2.0 + lo/2.0).toInt
-      val found = logical(idx, mid)
-      if(found == relativeOffset)
+      val found = relativeOffset(idx, mid)
+      if(found == relOffset)
         return mid
-      else if(found < relativeOffset)
+      else if(found < relOffset)
         lo = mid
       else
         hi = mid - 1
@@ -153,8 +153,8 @@ class OffsetIndex(val file: File, val ba
     lo
   }
   
-  /* return the nth logical offset relative to the base offset */
-  private def logical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
+  /* return the nth offset relative to the base offset */
+  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
   
   /* return the nth physical offset */
   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
@@ -166,23 +166,23 @@ class OffsetIndex(val file: File, val ba
     if(n >= entries)
       throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of
size %d.".format(n, entries))
     val idx = mmap.duplicate
-    OffsetPosition(logical(idx, n), physical(idx, n))
+    OffsetPosition(relativeOffset(idx, n), physical(idx, n))
   }
   
   /**
    * Append entry for the given offset/location pair to the index. This entry must have a
larger offset than all subsequent entries.
    */
-  def append(logicalOffset: Long, position: Int) {
+  def append(offset: Long, position: Int) {
     this synchronized {
       if(isFull)
         throw new IllegalStateException("Attempt to append to a full index (size = " + size
+ ").")
-      if(size.get > 0 && logicalOffset <= lastOffset)
-        throw new IllegalArgumentException("Attempt to append an offset (" + logicalOffset
+ ") no larger than the last offset appended (" + lastOffset + ").")
-      debug("Adding index entry %d => %d to %s.".format(logicalOffset, position, file.getName))
-      this.mmap.putInt((logicalOffset - baseOffset).toInt)
+      if(size.get > 0 && offset <= lastOffset)
+        throw new IllegalArgumentException("Attempt to append an offset (%d) to position
%d no larger than the last offset appended (%d).".format(offset, entries, lastOffset))
+      debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
+      this.mmap.putInt((offset - baseOffset).toInt)
       this.mmap.putInt(position)
       this.size.incrementAndGet()
-      this.lastOffset = logicalOffset
+      this.lastOffset = offset
     }
   }
   
@@ -213,7 +213,7 @@ class OffsetIndex(val file: File, val ba
       val newEntries = 
         if(slot < 0)
           0
-        else if(logical(idx, slot) == offset)
+        else if(relativeOffset(idx, slot) == offset - baseOffset)
           slot
         else
           slot + 1

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1403859&r1=1403858&r2=1403859&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Tue
Oct 30 20:18:45 2012
@@ -18,7 +18,7 @@ class LogSegmentTest extends JUnit3Suite
     val ms = new FileMessageSet(msFile)
     val idxFile = TestUtils.tempFile()
     idxFile.delete()
-    val idx = new OffsetIndex(idxFile, offset, 100)
+    val idx = new OffsetIndex(idxFile, offset, 1000)
     val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
     segments += seg
     seg
@@ -86,12 +86,31 @@ class LogSegmentTest extends JUnit3Suite
   @Test
   def testTruncate() {
     val seg = createSegment(40)
-    val ms = messages(50, "hello", "there", "you")
-    seg.append(50, ms)
-    seg.truncateTo(51)
-    val read = seg.read(50, maxSize = 1000, None)
-    assertEquals(1, read.size)
-    assertEquals(ms.head, read.head)
+    var offset = 40
+    for(i <- 0 until 30) {
+      val ms1 = messages(offset, "hello")
+      seg.append(offset, ms1)
+      val ms2 = messages(offset+1, "hello")
+      seg.append(offset+1, ms2)
+      // check that we can read back both messages
+      val read = seg.read(offset, 10000, None)
+      assertEquals(List(ms1.head, ms2.head), read.toList)
+      // now truncate off the last message
+      seg.truncateTo(offset + 1)
+      val read2 = seg.read(offset, 10000, None)
+      assertEquals(1, read2.size)
+      assertEquals(ms1.head, read2.head)
+      offset += 1
+    }
+  }
+  
+  @Test
+  def testTruncateFull() {
+    // test the case where we fully truncate the log
+    val seg = createSegment(40)
+    seg.append(40, messages(40, "hello", "there"))
+    seg.truncateTo(0)
+    seg.append(40, messages(40, "hello", "there"))    
   }
   
   @Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala?rev=1403859&r1=1403858&r2=1403859&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
Tue Oct 30 20:18:45 2012
@@ -115,18 +115,30 @@ class OffsetIndexTest extends JUnitSuite
     for(i <- 1 until 10)
       idx.append(i, i)
       
+    // now check the last offset after various truncate points and validate that we can still
append to the index.      
     idx.truncateTo(12)
     assertEquals("Index should be unchanged by truncate past the end", OffsetPosition(9,
9), idx.lookup(10))
+    assertEquals("9 should be the last entry in the index", 9, idx.lastOffset)
+    
+    idx.append(10, 10)
     idx.truncateTo(10)
     assertEquals("Index should be unchanged by truncate at the end", OffsetPosition(9, 9),
idx.lookup(10))
+    assertEquals("9 should be the last entry in the index", 9, idx.lastOffset)
+    idx.append(10, 10)
+    
     idx.truncateTo(9)
     assertEquals("Index should truncate off last entry", OffsetPosition(8, 8), idx.lookup(10))
+    assertEquals("8 should be the last entry in the index", 8, idx.lastOffset)
+    idx.append(9, 9)
+    
     idx.truncateTo(5)
     assertEquals("4 should be the last entry in the index", OffsetPosition(4, 4), idx.lookup(10))
     assertEquals("4 should be the last entry in the index", 4, idx.lastOffset)
+    idx.append(5, 5)
     
     idx.truncate()
     assertEquals("Full truncation should leave no entries", 0, idx.entries())
+    idx.append(0, 0)
   }
   
   def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T])
{



Mime
View raw message