kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1386641 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/ main/scala/kafka/message/ test/scala/unit/kafka/log/
Date Mon, 17 Sep 2012 14:52:18 GMT
Author: junrao
Date: Mon Sep 17 14:52:18 2012
New Revision: 1386641

URL: http://svn.apache.org/viewvc?rev=1386641&view=rev
Log:
log.truncateTo needs to handle targetOffset smaller than the lowest offset in the log ; patched
by Swapnil Ghike; reviewed by Jun Rao; KAFKA-463

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1386641&r1=1386640&r2=1386641&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Mon Sep 17 14:52:18
2012
@@ -91,7 +91,7 @@ object Log {
 
 
 /**
- * A segment file in the log directory. Each log semgment consists of an open message set,
a start offset and a size 
+ * A segment file in the log directory. Each log segment consists of an open message set,
a start offset and a size
  */
 class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time)
extends Range {
   var firstAppendTime: Option[Long] = None
@@ -345,20 +345,23 @@ private[kafka] class Log( val dir: File,
       roll()
   }
 
+  private def rollSegment(newOffset: Long) {
+    val newFile = new File(dir, nameFromOffset(newOffset))
+    if (newFile.exists) {
+      warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
+      newFile.delete()
+    }
+    debug("Rolling log '" + name + "' to " + newFile.getName())
+    segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset,
time))
+  }
+
   /**
    * Create a new segment and make it active
    */
   def roll() {
     lock synchronized {
       flush
-      val newOffset = logEndOffset
-      val newFile = new File(dir, nameFromOffset(newOffset))
-      if (newFile.exists) {
-        warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it
first")
-        newFile.delete()
-      }
-      debug("Rolling log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset,
time))
+      rollSegment(logEndOffset)
     }
   }
 
@@ -432,19 +435,11 @@ private[kafka] class Log( val dir: File,
   def truncateAndStartWithNewOffset(newOffset: Long) {
     lock synchronized {
       val deletedSegments = segments.trunc(segments.view.size)
-      val newFile = new File(dir, Log.nameFromOffset(newOffset))
-      debug("Truncate and start log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset,
time))
+      rollSegment(newOffset)
       deleteSegments(deletedSegments)
     }
   }
 
-
-  def deleteWholeLog():Unit = {
-    deleteSegments(segments.contents.get())
-    Utils.rm(dir)
-  }
-
   /* Attempts to delete all provided segments from a log and returns how many it was able
to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0
@@ -461,26 +456,34 @@ private[kafka] class Log( val dir: File,
   }
 
   def truncateTo(targetOffset: Long) {
-    // find the log segment that has this hw
-    val segmentToBeTruncated = segments.view.find(
-      segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
-
-    segmentToBeTruncated match {
-      case Some(segment) =>
-        val truncatedSegmentIndex = segments.view.indexOf(segment)
-        segments.truncLast(truncatedSegmentIndex)
-        segment.truncateTo(targetOffset)
-        info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath,
targetOffset))
-      case None =>
-        if(targetOffset > segments.view.last.absoluteEndOffset)
-         error("Last checkpointed hw %d cannot be greater than the latest message offset
%d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
-    }
-
-    val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-    if(segmentsToBeDeleted.size < segments.view.size) {
+    lock synchronized {
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+      val viewSize = segments.view.size
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
+
+      /* We should not hit this error because segments.view is locked in markedDeletedWhile()
*/
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
-        error("Failed to delete some segments during log recovery")
+        error("Failed to delete some segments during log recovery during truncateTo(" + targetOffset
+")")
+
+      if (numSegmentsDeleted == viewSize) {
+        segments.trunc(segments.view.size)
+        rollSegment(targetOffset)
+      } else {
+        // find the log segment that has this hw
+        val segmentToBeTruncated =
+          segments.view.find(segment => targetOffset >= segment.start && targetOffset
< segment.absoluteEndOffset)
+        segmentToBeTruncated match {
+          case Some(segment) =>
+            val truncatedSegmentIndex = segments.view.indexOf(segment)
+            segments.truncLast(truncatedSegmentIndex)
+            segment.truncateTo(targetOffset)
+            info("Truncated log segment %s to target offset %d".format(segment.file.getAbsolutePath,
targetOffset))
+          case None =>
+            if(targetOffset > segments.view.last.absoluteEndOffset)
+              error("Target offset %d cannot be greater than the last message offset %d in
the log %s".
+                format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+        }
+      }
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala?rev=1386641&r1=1386640&r2=1386641&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala Mon Sep 17
14:52:18 2012
@@ -40,15 +40,12 @@ private[log] class SegmentList[T](seq: S
    * Append the given items to the end of the list
    */
   def append(ts: T*)(implicit m: ClassManifest[T]) {
-    while(true){
-      val curr = contents.get()
-      val updated = new Array[T](curr.length + ts.length)
-      Array.copy(curr, 0, updated, 0, curr.length)
-      for(i <- 0 until ts.length)
-        updated(curr.length + i) = ts(i)
-      if(contents.compareAndSet(curr, updated))
-        return
-    }
+    val curr = contents.get()
+    val updated = new Array[T](curr.length + ts.length)
+    Array.copy(curr, 0, updated, 0, curr.length)
+    for(i <- 0 until ts.length)
+      updated(curr.length + i) = ts(i)
+    contents.set(updated)
   }
   
   
@@ -59,39 +56,33 @@ private[log] class SegmentList[T](seq: S
     if(newStart < 0)
       throw new KafkaException("Starting index must be positive.");
     var deleted: Array[T] = null
-    var done = false
-    while(!done) {
-      val curr = contents.get()
+    val curr = contents.get()
+    if (curr.length > 0) {
       val newLength = max(curr.length - newStart, 0)
       val updated = new Array[T](newLength)
       Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength)
-      if(contents.compareAndSet(curr, updated)) {
-        deleted = new Array[T](newStart)
-        Array.copy(curr, 0, deleted, 0, curr.length - newLength)
-        done = true
-      }
+      contents.set(updated)
+      deleted = new Array[T](newStart)
+      Array.copy(curr, 0, deleted, 0, curr.length - newLength)
     }
     deleted
   }
 
   /**
-   * Delete the items from position newEnd until end of list
+   * Delete the items from position (newEnd + 1) until end of list
    */
   def truncLast(newEnd: Int): Seq[T] = {
-    if(newEnd >= contents.get().size-1)
-      throw new KafkaException("End index must be segment list size - 1");
+    if (newEnd < 0 || newEnd > contents.get().length-1)
+      throw new KafkaException("End index must be positive and less than segment list size.");
     var deleted: Array[T] = null
-    var done = false
-    while(!done) {
-      val curr = contents.get()
+    val curr = contents.get()
+    if (curr.length > 0) {
       val newLength = newEnd + 1
       val updated = new Array[T](newLength)
       Array.copy(curr, 0, updated, 0, newLength)
-      if(contents.compareAndSet(curr, updated)) {
-        deleted = new Array[T](curr.length - newLength)
-        Array.copy(curr, newEnd + 1, deleted, 0, curr.length - newLength)
-        done = true
-      }
+      contents.set(updated)
+      deleted = new Array[T](curr.length - newLength)
+      Array.copy(curr, min(newEnd + 1, curr.length - 1), deleted, 0, curr.length - newLength)
     }
     deleted
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1386641&r1=1386640&r2=1386641&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Mon
Sep 17 14:52:18 2012
@@ -196,7 +196,7 @@ class FileMessageSet private[kafka](priv
   }
 
   def truncateTo(targetSize: Long) = {
-    if(targetSize >= sizeInBytes())
+    if(targetSize > sizeInBytes())
       throw new KafkaException("Attempt to truncate log segment to %d bytes failed since
the current ".format(targetSize) +
         " size of this log segment is only %d bytes".format(sizeInBytes()))
     channel.truncate(targetSize)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1386641&r1=1386640&r2=1386641&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Sep
17 14:52:18 2012
@@ -279,6 +279,56 @@ class LogTest extends JUnitSuite {
     assert(ret, "Second message set should throw MessageSizeTooLargeException.")
   }
 
+  @Test
+  def testTruncateTo() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    val msgPerSeg = 10
+    val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be
10 messages
+
+    // create a log
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    for (i<- 1 to msgPerSeg) {
+      log.append(set)
+    }
+    assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
+
+    val lastOffset = log.logEndOffset
+    val size = log.size
+    log.truncateTo(log.logEndOffset) // keep the entire log
+    assertEquals("Should not change offset", lastOffset, log.logEndOffset)
+    assertEquals("Should not change log size", size, log.size)
+    log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
+    assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset)
+    assertEquals("Should not change log size", size, log.size)
+    log.truncateTo(log.logEndOffset - 10) // truncate somewhere in between
+    assertEquals("Should change offset", lastOffset, log.logEndOffset + 10)
+    assertEquals("Should change log size", size, log.size + 10)
+    log.truncateTo(log.logEndOffset - log.size) // truncate the entire log
+    assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
+    assertEquals("Should change log size", log.size, 0)
+
+    for (i<- 1 to msgPerSeg) {
+      log.append(set)
+    }
+    assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
+    assertEquals("Should be back to original size", log.size, size)
+    log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)*setSize)
+    assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)*setSize)
+    assertEquals("Should change log size", log.size, 0)
+
+    for (i<- 1 to msgPerSeg) {
+      log.append(set)
+    }
+    assertEquals("Should be ahead of to original offset", log.logEndOffset, lastOffset +
setSize)
+    assertEquals("log size should be same as before", size, log.size)
+    log.truncateTo(log.logEndOffset - log.size - setSize) // truncate before first start
offset in the log
+    assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
+    assertEquals("Should change log size", log.size, 0)
+  }
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1386641&r1=1386640&r2=1386641&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
Mon Sep 17 14:52:18 2012
@@ -31,22 +31,46 @@ class SegmentListTest extends JUnitSuite
     val view = sl.view
     assertEquals(list, view.iterator.toList)
     sl.append(5)
-    assertEquals("Appending to both should result in list that are still equals", 
+    assertEquals("Appending to both should result in lists that are still equals",
                  list ::: List(5), sl.view.iterator.toList)
     assertEquals("But the prior view should still equal the original list", list, view.iterator.toList)
   }
   
   @Test
   def testTrunc() {
-    val hd = List(1,2,3)
-    val tail = List(4,5,6)
-    val sl = new SegmentList(hd ::: tail)
-    val view = sl.view
-    assertEquals(hd ::: tail, view.iterator.toList)
-    val deleted = sl.trunc(3)
-    assertEquals(tail, sl.view.iterator.toList)
-    assertEquals(hd, deleted.iterator.toList)
-    assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    {
+      val hd = List(1,2,3)
+      val tail = List(4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.trunc(3)
+      assertEquals(tail, sl.view.iterator.toList)
+      assertEquals(hd, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+    {
+      val hd = List(1,2,3,4,5)
+      val tail = List(6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      try {
+        sl.trunc(-1)
+        fail("Attempt to truncate with illegal index should fail")
+      } catch {
+        case e: KafkaException => // this is ok
+      }
+      val deleted = sl.truncLast(4)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+    {
+      val sl = new SegmentList(List(1, 2))
+      sl.trunc(3)
+      assertEquals(0, sl.view.length)
+    }
   }
   
   @Test
@@ -62,7 +86,6 @@ class SegmentListTest extends JUnitSuite
       assertEquals(tail, deleted.iterator.toList)
       assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
     }
-
     {
       val hd = List(1,2,3,4,5)
       val tail = List(6)
@@ -71,10 +94,14 @@ class SegmentListTest extends JUnitSuite
       assertEquals(hd ::: tail, view.iterator.toList)
       try {
         sl.truncLast(6)
-        sl.truncLast(5)
+        fail("Attempt to truncate with illegal index should fail")
+      } catch {
+        case e: KafkaException => // this is ok
+      }
+      try {
         sl.truncLast(-1)
         fail("Attempt to truncate with illegal index should fail")
-      }catch {
+      } catch {
         case e: KafkaException => // this is ok
       }
       val deleted = sl.truncLast(4)
@@ -82,25 +109,5 @@ class SegmentListTest extends JUnitSuite
       assertEquals(tail, deleted.iterator.toList)
       assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
     }
-
-    {
-      val hd = List(1)
-      val tail = List(2,3,4,5,6)
-      val sl = new SegmentList(hd ::: tail)
-      val view = sl.view
-      assertEquals(hd ::: tail, view.iterator.toList)
-      val deleted = sl.truncLast(0)
-      assertEquals(hd, sl.view.iterator.toList)
-      assertEquals(tail, deleted.iterator.toList)
-      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
-    }
   }
-
-  @Test
-  def testTruncBeyondList() {
-    val sl = new SegmentList(List(1, 2))
-    sl.trunc(3)
-    assertEquals(0, sl.view.length)
-  }
-  
 }



Mime
View raw message