kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1042 logSegments(from, to) Misses first segment.
Date Fri, 18 Oct 2013 18:33:59 GMT
Updated Branches:
  refs/heads/trunk d0e948ca8 -> d03077886


KAFKA-1042 logSegments(from, to) Misses first segment.

I think this patch also fixes the continual timing problems we have had in log tests. The
root cause
was that we weren't passing through the clock instance so we were mixing instances of MockTime
and SystemTime.
This worked only because MockTime initializes to SystemTime.milliseconds so as long as the
test took less than 1 ms it worked!


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0307788
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0307788
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0307788

Branch: refs/heads/trunk
Commit: d030778861e6d274281adf7e0311dc4d51448c84
Parents: d0e948c
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Fri Oct 18 11:15:39 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Fri Oct 18 11:33:50 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 20 +++++++++++++++-----
 core/src/main/scala/kafka/log/LogSegment.scala  |  4 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 12 +++++-------
 3 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0307788/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9fe61ff..008cc55 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -132,7 +132,8 @@ class Log(val dir: File,
         val segment = new LogSegment(dir = dir, 
                                      startOffset = start,
                                      indexIntervalBytes = config.indexInterval, 
-                                     maxIndexSize = config.maxIndexSize)
+                                     maxIndexSize = config.maxIndexSize,
+                                     time = time)
         if(!hasIndex) {
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
@@ -146,7 +147,8 @@ class Log(val dir: File,
       segments.put(0, new LogSegment(dir = dir, 
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
-                                     maxIndexSize = config.maxIndexSize))
+                                     maxIndexSize = config.maxIndexSize,
+                                     time = time))
     } else {
       recoverLog()
       // reset the index size of the currently active log segment to allow more entries
@@ -472,7 +474,8 @@ class Log(val dir: File,
       val segment = new LogSegment(dir, 
                                    startOffset = newOffset,
                                    indexIntervalBytes = config.indexInterval, 
-                                   maxIndexSize = config.maxIndexSize)
+                                   maxIndexSize = config.maxIndexSize,
+                                   time = time)
       val prev = addSegment(segment)
       if(prev != null)
         throw new KafkaException("Trying to roll a new log segment for topic partition %s
with start offset %d while it already exists.".format(name, newOffset))
@@ -561,7 +564,8 @@ class Log(val dir: File,
       addSegment(new LogSegment(dir, 
                                 newOffset,
                                 indexIntervalBytes = config.indexInterval, 
-                                maxIndexSize = config.maxIndexSize))
+                                maxIndexSize = config.maxIndexSize,
+                                time = time))
       this.nextOffset.set(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
       truncates.getAndIncrement
@@ -592,7 +596,13 @@ class Log(val dir: File,
    */
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
     import JavaConversions._
-    segments.subMap(from, true, to, false).values
+    lock synchronized {
+      val floor: java.lang.Long = segments.floorKey(from)
+      if(floor eq null)
+        asIterable(segments.headMap(to).values)
+      else
+        asIterable(segments.subMap(floor, true, to, false).values)
+    }
   }
   
   override def toString() = "Log(" + dir + ")"

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0307788/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index fe39d79..0d6926e 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -48,12 +48,12 @@ class LogSegment(val log: FileMessageSet,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
   
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = 
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time:
Time) = 
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
          new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset,
maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
-         SystemTime)
+         time)
     
   /* Return the size in bytes of this log segment */
   def size: Long = log.sizeInBytes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0307788/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 18d2e7c..140317c 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -32,7 +32,7 @@ import kafka.server.KafkaConfig
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
-  val time = new MockTime
+  val time = new MockTime(0)
   var config: KafkaConfig = null
   val logConfig = LogConfig()
 
@@ -62,7 +62,6 @@ class LogTest extends JUnitSuite {
   @Test
   def testTimeBasedLogRoll() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val time: MockTime = new MockTime()
 
     // create a log
     val log = new Log(logDir, 
@@ -70,16 +69,15 @@ class LogTest extends JUnitSuite {
                       recoveryPoint = 0L, 
                       scheduler = time.scheduler, 
                       time = time)
+    assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     time.sleep(log.config.segmentMs + 1)
-
-    // segment age is less than its limit
     log.append(set)
-    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+    assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
 
     log.append(set)
-    assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
+    assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
 
-    for(numSegments <- 2 until 4) {
+    for(numSegments <- 3 until 5) {
       time.sleep(log.config.segmentMs + 1)
       log.append(set)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.",
numSegments, log.numberOfSegments)


Mime
View raw message