kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1008 Unmap offset indexes before resizing.
Date Wed, 09 Oct 2013 00:05:23 GMT
Updated Branches:
  refs/heads/0.8 b384c27c3 -> 6273696fb


KAFKA-1008 Unmap offset indexes before resizing.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6273696f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6273696f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6273696f

Branch: refs/heads/0.8
Commit: 6273696fb2a2da22b3b99af60ffe31c01362fdbc
Parents: b384c27
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Aug 28 17:19:40 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Tue Oct 8 17:03:55 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala |  11 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   6 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala | 108 +++++++++++++------
 .../kafka/server/AbstractFetcherThread.scala    |  11 +-
 core/src/main/scala/kafka/utils/Os.scala        |  23 ++++
 core/src/main/scala/kafka/utils/Utils.scala     |  13 +++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  12 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala |   6 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  14 +++
 9 files changed, 140 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 8c03308..30a9c97 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -24,6 +24,7 @@ import scala.collection.immutable
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition
@@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
     leaderFinderThread.start()
 
-    lock.lock()
-    try {
+    inLock(lock) {
       partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId),
tpi)).toMap
       this.cluster = cluster
       noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
-    } finally {
-      lock.unlock()
     }
   }
 
@@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
     debug("adding partitions with error %s".format(partitionList))
-    lock.lock()
-    try {
+    inLock(lock) {
       if (partitionMap != null) {
         noLeaderPartitionSet ++= partitionList
         cond.signalAll()
       }
-    } finally {
-      lock.unlock()
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 36b167b..612aeec 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.util.UUID
 import kafka.serializer._
 import kafka.utils.ZkUtils._
+import kafka.utils.Utils.inLock
 import kafka.common._
 import com.yammer.metrics.core.Gauge
 import kafka.metrics._
@@ -363,12 +364,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      lock.lock()
-      try {
+      inLock(lock) {
         isWatcherTriggered = true
         cond.signalAll()
-      } finally {
-        lock.unlock()
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 9de3d31..234ddba 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -21,8 +21,10 @@ import scala.math._
 import java.io._
 import java.nio._
 import java.nio.channels._
+import java.util.concurrent.locks._
 import java.util.concurrent.atomic._
 import kafka.utils._
+import kafka.utils.Utils.inLock
 import kafka.common.InvalidOffsetException
 
 /**
@@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException
  */
 class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends
Logging {
   
+  private val lock = new ReentrantLock
+  
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
     {
@@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
   /* the number of entries in the index */
   private var size = new AtomicInteger(mmap.position / 8)
   
+  /**
+   * The maximum number of eight-byte entries this index can hold
+   */
+  @volatile
+  var maxEntries = mmap.limit / 8
+  
   /* the last offset in the index */
   var lastOffset = readLastOffset()
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset
= %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
 
-  /* the maximum number of entries this index can hold */
-  def maxEntries = mmap.limit / 8
-
   /**
    * The last offset written to the index
    */
   private def readLastOffset(): Long = {
-    val offset = 
-      size.get match {
-        case 0 => 0
-        case s => relativeOffset(this.mmap, s-1)
-      }
-    baseOffset + offset
+    inLock(lock) {
+      val offset = 
+        size.get match {
+          case 0 => 0
+          case s => relativeOffset(this.mmap, s-1)
+        }
+      baseOffset + offset
+    }
   }
 
   /**
@@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
-    val idx = mmap.duplicate
-    val slot = indexSlotFor(idx, targetOffset)
-    if(slot == -1)
-      OffsetPosition(baseOffset, 0)
-    else
-      OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
+    maybeLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, targetOffset)
+      if(slot == -1)
+        OffsetPosition(baseOffset, 0)
+      else
+        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
+      }
   }
   
   /**
@@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * Get the nth offset mapping from the index
    */
   def entry(n: Int): OffsetPosition = {
-    if(n >= entries)
-      throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of
size %d.".format(n, entries))
-    val idx = mmap.duplicate
-    OffsetPosition(relativeOffset(idx, n), physical(idx, n))
+    maybeLock(lock) {
+      if(n >= entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index
of size %d.".format(n, entries))
+      val idx = mmap.duplicate
+      OffsetPosition(relativeOffset(idx, n), physical(idx, n))
+    }
   }
   
   /**
    * Append entry for the given offset/location pair to the index. This entry must have a
larger offset than all subsequent entries.
    */
   def append(offset: Long, position: Int) {
-    this synchronized {
+    inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + size + ").")
       if (size.get == 0 || offset > lastOffset) {
         debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
@@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
         this.size.incrementAndGet()
         this.lastOffset = offset
         require(entries * 8 == mmap.position, entries + " entries but file position in index
is " + mmap.position + ".")
-      }
-      else {
+      } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d
no larger than the last offset appended (%d) to %s."
           .format(offset, entries, lastOffset, file.getName))
       }
@@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * Truncating to an offset larger than the largest in the index has no effect.
    */
   def truncateTo(offset: Long) {
-    this synchronized {
+    inLock(lock) {
       val idx = mmap.duplicate
       val slot = indexSlotFor(idx, offset)
 
@@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * Truncates index to a known number of entries.
    */
   private def truncateToEntries(entries: Int) {
-    this.size.set(entries)
-    mmap.position(this.size.get * 8)
-    this.lastOffset = readLastOffset
+    inLock(lock) {
+      this.size.set(entries)
+      mmap.position(this.size.get * 8)
+      this.lastOffset = readLastOffset
+    }
   }
   
   /**
@@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * the file.
    */
   def trimToValidSize() {
-    this synchronized {
+    inLock(lock) {
       resize(entries * 8)
     }
   }
@@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * we want to reset the index size to maximum index size to avoid rolling new segment.
    */
   def resize(newSize: Int) {
-    this synchronized {
-      flush()
+    inLock(lock) {
       val raf = new RandomAccessFile(file, "rws")
       val roundedNewSize = roundToExactMultiple(newSize, 8)
+      val position = this.mmap.position
+      
+      /* Windows won't let us modify the file length while the file is mmapped :-( */
+      if(Os.isWindows)
+        forceUnmap(this.mmap)
       try {
         raf.setLength(roundedNewSize)
-        val position = this.mmap.position
         this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        this.maxEntries = this.mmap.limit / 8
         this.mmap.position(position)
       } finally {
         Utils.swallow(raf.close())
@@ -271,10 +289,22 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
   }
   
   /**
+   * Forcefully free the buffer's mmap. We do this only on windows.
+   */
+  private def forceUnmap(m: MappedByteBuffer) {
+    try {
+      if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
+        (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
+    } catch {
+      case t: Throwable => warn("Error when freeing index buffer", t)
+    }
+  }
+  
+  /**
    * Flush the data in the index to disk
    */
   def flush() {
-    this synchronized {
+    inLock(lock) {
       mmap.force()
     }
   }
@@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
    * E.g. roundToExactMultiple(67, 8) == 64
    */
   private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+  
+  /**
+   * Execute the given function in a lock only if we are running on windows. We do this 
+   * because Windows won't let us resize a file while it is mmapped. As a result we have
to force unmap it
+   * and this requires synchronizing reads.
+   */
+  private def maybeLock[T](lock: Lock)(fun: => T): T = {
+    if(Os.isWindows)
+      lock.lock()
+    try {
+      return fun
+    } finally {
+      if(Os.isWindows)
+        lock.unlock()
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a5fc96d..c64260f 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
+import kafka.utils.Utils.inLock
 
 
 /**
@@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
-    partitionMapLock.lock()
-    try {
+    inLock(partitionMapLock) {
       if (partitionMap.isEmpty)
         partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
       partitionMap.foreach {
@@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize)
       }
-    } finally {
-      partitionMapLock.unlock()
     }
 
     val fetchRequest = fetchRequestBuilder.build()
@@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 
     if (response != null) {
       // process fetched data
-      partitionMapLock.lock()
-      try {
+      inLock(partitionMapLock) {
         response.data.foreach {
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
@@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
               }
             }
         }
-      } finally {
-        partitionMapLock.unlock()
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/utils/Os.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala
new file mode 100644
index 0000000..cb9950e
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Os.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+object Os {
+  val name = System.getProperty("os.name").toLowerCase
+  val isWindows = name.startsWith("windows")
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index e0a5a27..60a019c 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.nio._
 import charset.Charset
 import java.nio.channels._
+import java.util.concurrent.locks.Lock
 import java.lang.management._
 import java.util.zip.CRC32
 import javax.management._
@@ -554,4 +555,16 @@ object Utils extends Logging {
    * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue
(!).
    */
   def abs(n: Int) = n & 0x7fffffff
+  
+  /**
+   * Execute the given function inside the lock
+   */
+  def inLock[T](lock: Lock)(fun: => T): T = {
+    lock.lock()
+    try {
+      return fun
+    } finally {
+      lock.unlock()
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 6eede1b..c21bc60 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -35,6 +35,7 @@ import kafka.controller.KafkaController
 import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
+import kafka.utils.Utils.inLock
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -774,8 +775,7 @@ class LeaderExistsOrChangedListener(topic: String,
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
-    leaderLock.lock()
-    try {
+    inLock(leaderLock) {
       if(t == topic && p == partition){
         if(oldLeaderOpt == None){
           trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic,
partition))
@@ -790,18 +790,12 @@ class LeaderExistsOrChangedListener(topic: String,
         }
       }
     }
-    finally {
-      leaderLock.unlock()
-    }
   }
 
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
-    leaderLock.lock()
-    try {
+    inLock(leaderLock) {
       leaderExistsOrChanged.signal()
-    }finally {
-      leaderLock.unlock()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ee591d0..a06cfff 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,6 +37,7 @@ import kafka.api._
 import collection.mutable.Map
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
+import kafka.utils.Utils.inLock
 import junit.framework.Assert
 
 
@@ -425,8 +426,7 @@ object TestUtils extends Logging {
     else
       info("Waiting for leader for partition [%s,%d] to be changed from old leader %d".format(topic,
partition, oldLeaderOpt.get))
 
-    leaderLock.lock()
-    try {
+    inLock(leaderLock) {
       zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt,
zkClient))
       leaderExistsOrChanged.await(timeoutMs, TimeUnit.MILLISECONDS)
       // check if leader is elected
@@ -441,8 +441,6 @@ object TestUtils extends Logging {
                                    .format(timeoutMs, topic, partition))
       }
       leader
-    } finally {
-      leaderLock.unlock()
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/6273696f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 6b21554..96b5d42 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -19,10 +19,12 @@ package kafka.utils
 
 import java.util.Arrays
 import java.nio.ByteBuffer
+import java.util.concurrent.locks._
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
 import kafka.common.KafkaException
+import kafka.utils.Utils.inLock
 import org.junit.Test
 
 
@@ -74,4 +76,16 @@ class UtilsTest extends JUnitSuite {
     assertTrue(emptyStringList.equals(emptyListFromNullString))
     assertTrue(emptyStringList.equals(emptyList))
   }
+  
+  @Test
+  def testInLock() {
+    val lock = new ReentrantLock()
+    val result = inLock(lock) {
+      assertTrue("Should be in lock", lock.isHeldByCurrentThread)
+      1 + 1
+    }
+    assertEquals(2, result)
+    assertFalse("Should be unlocked", lock.isLocked)
+    
+  }
 }


Mime
View raw message