kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-748 Maintain mmap position after resizing index. Also add a few more assertions.
Date Fri, 08 Feb 2013 18:56:22 GMT
Updated Branches:
  refs/heads/0.8 b89fc2be8 -> 790a1504c


KAFKA-748 Maintain mmap position after resizing index. Also add a few more assertions.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/790a1504
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/790a1504
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/790a1504

Branch: refs/heads/0.8
Commit: 790a1504c3437b84ce2d910b9769a2b4ae481ed3
Parents: b89fc2b
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Fri Feb 8 10:50:38 2013 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Fri Feb 8 10:50:38 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala            |    8 ++++-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   20 +++++++++-----
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |    6 +++-
 3 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5cd36e0..eee0ed3 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -126,6 +126,8 @@ private[kafka] class Log(val dir: File,
     
   /* Calculate the offset of the next message */
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
+  
+  debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
   newGauge(name + "-" + "NumLogSegments",
            new Gauge[Int] { def getValue = numberOfSegments })
@@ -275,8 +277,10 @@ private[kafka] class Log(val dir: File,
               val lastOffset = offsetCounter.get - 1
               (firstOffset, lastOffset)
             } else {
-              if(!messageSetInfo.offsetsMonotonic)
-                throw new IllegalArgumentException("Out of order offsets found in " + messages)
+              require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " +
messages)
+              require(messageSetInfo.firstOffset >= nextOffset.get, 
+                      "Attempt to append a message set beginning with offset %d to a log
with log end offset %d."
+                      .format(messageSetInfo.firstOffset, nextOffset.get))
               (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
             }
           

http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 43b3575..0d67242 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -90,8 +90,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
   /* the last offset in the index */
   var lastOffset = readLastOffset()
   
-  info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset
= %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+  info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset
= %d, file position = %d"
+    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
+  require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file
(%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset))
 
   /* the maximum number of entries this index can hold */
   def maxEntries = mmap.limit / 8
@@ -177,15 +178,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    */
   def append(offset: Long, position: Int) {
     this synchronized {
-      if(isFull)
-        throw new IllegalStateException("Attempt to append to a full index (size = " + size
+ ").")
-      if(size.get > 0 && offset <= lastOffset)
-        throw new IllegalArgumentException("Attempt to append an offset (%d) to position
%d no larger than the last offset appended (%d).".format(offset, entries, lastOffset))
+      require(!isFull, "Attempt to append to a full index (size = " + size + ").")
+      require(size.get == 0 || offset > lastOffset, "Attempt to append an offset (%d)
to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset))
       debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
       this.mmap.putInt((offset - baseOffset).toInt)
       this.mmap.putInt(position)
       this.size.incrementAndGet()
       this.lastOffset = offset
+      require(entries * 8 == mmap.position, entries + " entries but file position in index
is " + mmap.position + ".")
     }
   }
   
@@ -230,7 +230,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes
from
    * the file.
    */
-  def trimToValidSize() = resize(entries * 8)
+  def trimToValidSize() {
+    this synchronized {
+      resize(entries * 8)
+    }
+  }
 
   /**
    * Reset the size of the memory map and the underneath file. This is used in two kinds
of cases: (1) in
@@ -245,7 +249,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
       val roundedNewSize = roundToExactMultiple(newSize, 8)
       try {
         raf.setLength(roundedNewSize)
+        val position = this.mmap.position
         this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        this.mmap.position(position)
       } finally {
         Utils.swallow(raf.close())
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 952a40b..051ebe3 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -86,7 +86,7 @@ class OffsetIndexTest extends JUnitSuite {
       val offset = idx.baseOffset + i + 1
       idx.append(offset, i)
     }
-    assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException])
+    assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalArgumentException])
   }
   
   @Test(expected = classOf[IllegalArgumentException])
@@ -105,7 +105,9 @@ class OffsetIndexTest extends JUnitSuite {
     val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
-    assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException])
+    assertEquals(sec.offset, idxRo.lastOffset)
+    assertEquals(2, idxRo.entries)
+    assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalArgumentException])
   }
   
   @Test


Mime
View raw message