kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [2/2] git commit: KAFKA-1008 Lock around unmap on windows.
Date Tue, 15 Oct 2013 21:07:51 GMT
KAFKA-1008 Lock around unmap on windows.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c98b6de1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c98b6de1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c98b6de1

Branch: refs/heads/trunk
Commit: c98b6de15db9a2ce72f44a5b2736f74de5066113
Parents: a160f10
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Oct 9 13:50:09 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Tue Oct 15 14:06:48 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala |  11 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   6 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala | 109 +++++++++++++------
 .../kafka/server/AbstractFetcherThread.scala    |  11 +-
 core/src/main/scala/kafka/utils/Os.scala        |  23 ++++
 core/src/main/scala/kafka/utils/Utils.scala     |  13 +++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  12 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  14 +++
 8 files changed, 138 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 4e76367..566ca46 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -24,6 +24,7 @@ import scala.collection.immutable
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition
@@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
     leaderFinderThread.start()
 
-    lock.lock()
-    try {
+    inLock(lock) {
       partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId),
tpi)).toMap
       this.cluster = cluster
       noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
-    } finally {
-      lock.unlock()
     }
   }
 
@@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
     debug("adding partitions with error %s".format(partitionList))
-    lock.lock()
-    try {
+    inLock(lock) {
       if (partitionMap != null) {
         noLeaderPartitionSet ++= partitionList
         cond.signalAll()
       }
-    } finally {
-      lock.unlock()
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 77449f5..c0350cd 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.util.UUID
 import kafka.serializer._
 import kafka.utils.ZkUtils._
+import kafka.utils.Utils.inLock
 import kafka.common._
 import com.yammer.metrics.core.Gauge
 import kafka.metrics._
@@ -366,12 +367,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      lock.lock()
-      try {
+      inLock(lock) {
         isWatcherTriggered = true
         cond.signalAll()
-      } finally {
-        lock.unlock()
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index afab848..aa654e8 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -21,8 +21,10 @@ import scala.math._
 import java.io._
 import java.nio._
 import java.nio.channels._
+import java.util.concurrent.locks._
 import java.util.concurrent.atomic._
 import kafka.utils._
+import kafka.utils.Utils.inLock
 import kafka.common.InvalidOffsetException
 
 /**
@@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException
  */
 class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int =
-1) extends Logging {
   
+  private val lock = new ReentrantLock
+  
   /* initialize the memory mapping for this index */
   private var mmap: MappedByteBuffer = 
     {
@@ -88,6 +92,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   /* the number of eight-byte entries currently in the index */
   private var size = new AtomicInteger(mmap.position / 8)
   
+  /**
+   * The maximum number of eight-byte entries this index can hold
+   */
+  @volatile
+  var maxEntries = mmap.limit / 8
+  
   /* the last offset in the index */
   var lastOffset = readLastOffset()
   
@@ -98,18 +108,15 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * The last offset written to the index
    */
   private def readLastOffset(): Long = {
-    val offset = 
-      size.get match {
-        case 0 => 0
-        case s => relativeOffset(this.mmap, s-1)
-      }
-    baseOffset + offset
+    inLock(lock) {
+      val offset = 
+        size.get match {
+          case 0 => 0
+          case s => relativeOffset(this.mmap, s-1)
+        }
+      baseOffset + offset
+    }
   }
-  
-  /**
-   * The maximum number of eight-byte entries this index can hold
-   */
-  def maxEntries = mmap.limit / 8
 
   /**
    * Find the largest offset less than or equal to the given targetOffset 
@@ -122,12 +129,14 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
-    val idx = mmap.duplicate
-    val slot = indexSlotFor(idx, targetOffset)
-    if(slot == -1)
-      OffsetPosition(baseOffset, 0)
-    else
-      OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
+    maybeLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, targetOffset)
+      if(slot == -1)
+        OffsetPosition(baseOffset, 0)
+      else
+        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
+      }
   }
   
   /**
@@ -179,17 +188,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * @return The offset/position pair at that entry
    */
   def entry(n: Int): OffsetPosition = {
-    if(n >= entries)
-      throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of
size %d.".format(n, entries))
-    val idx = mmap.duplicate
-    OffsetPosition(relativeOffset(idx, n), physical(idx, n))
+    maybeLock(lock) {
+      if(n >= entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index
of size %d.".format(n, entries))
+      val idx = mmap.duplicate
+      OffsetPosition(relativeOffset(idx, n), physical(idx, n))
+    }
   }
   
   /**
    * Append an entry for the given offset/location pair to the index. This entry must have
a larger offset than all subsequent entries.
    */
   def append(offset: Long, position: Int) {
-    this synchronized {
+    inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + size + ").")
       if (size.get == 0 || offset > lastOffset) {
         debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
@@ -198,8 +209,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
         this.size.incrementAndGet()
         this.lastOffset = offset
         require(entries * 8 == mmap.position, entries + " entries but file position in index
is " + mmap.position + ".")
-      }
-      else {
+      } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d
no larger than the last offset appended (%d) to %s."
           .format(offset, entries, lastOffset, file.getName))
       }
@@ -221,7 +231,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * Truncating to an offset larger than the largest in the index has no effect.
    */
   def truncateTo(offset: Long) {
-    this synchronized {
+    inLock(lock) {
       val idx = mmap.duplicate
       val slot = indexSlotFor(idx, offset)
 
@@ -245,9 +255,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * Truncates index to a known number of entries.
    */
   private def truncateToEntries(entries: Int) {
-    this.size.set(entries)
-    mmap.position(this.size.get * 8)
-    this.lastOffset = readLastOffset
+    inLock(lock) {
+      this.size.set(entries)
+      mmap.position(this.size.get * 8)
+      this.lastOffset = readLastOffset
+    }
   }
   
   /**
@@ -255,7 +267,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * the file.
    */
   def trimToValidSize() {
-    this synchronized {
+    inLock(lock) {
       resize(entries * 8)
     }
   }
@@ -267,13 +279,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * we want to reset the index size to maximum index size to avoid rolling new segment.
    */
   def resize(newSize: Int) {
-    this synchronized {
+    inLock(lock) {
       val raf = new RandomAccessFile(file, "rws")
       val roundedNewSize = roundToExactMultiple(newSize, 8)
+      val position = this.mmap.position
+      
+      /* Windows won't let us modify the file length while the file is mmapped :-( */
+      if(Os.isWindows)
+        forceUnmap(this.mmap)
       try {
         raf.setLength(roundedNewSize)
-        val position = this.mmap.position
         this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        this.maxEntries = this.mmap.limit / 8
         this.mmap.position(position)
       } finally {
         Utils.swallow(raf.close())
@@ -282,10 +299,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
   }
   
   /**
+   * Forcefully free the buffer's mmap. We do this only on windows.
+   */
+  def forceUnmap(m: MappedByteBuffer) {
+    try {
+      if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
+        (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
+    } catch {
+      case t: Throwable => warn("Error when freeing index buffer", t)
+    }
+  }
+  
+  /**
    * Flush the data in the index to disk
    */
   def flush() {
-    this synchronized {
+    inLock(lock) {
       mmap.force()
     }
   }
@@ -326,4 +355,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
    * E.g. roundToExactMultiple(67, 8) == 64
    */
   private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+  
+  /**
+   * Execute the given function in a lock only if we are running on windows. We do this 
+   * because Windows won't let us resize a file while it is mmapped. As a result we have
to force unmap it
+   * and this requires synchronizing reads.
+   */
+  private def maybeLock[T](lock: Lock)(fun: => T): T = {
+    if(Os.isWindows)
+      lock.lock()
+    try {
+      return fun
+    } finally {
+      if(Os.isWindows)
+        lock.unlock()
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a5fc96d..c64260f 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
+import kafka.utils.Utils.inLock
 
 
 /**
@@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
-    partitionMapLock.lock()
-    try {
+    inLock(partitionMapLock) {
       if (partitionMap.isEmpty)
         partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
       partitionMap.foreach {
@@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize)
       }
-    } finally {
-      partitionMapLock.unlock()
     }
 
     val fetchRequest = fetchRequestBuilder.build()
@@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 
     if (response != null) {
       // process fetched data
-      partitionMapLock.lock()
-      try {
+      inLock(partitionMapLock) {
         response.data.foreach {
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
@@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
               }
             }
         }
-      } finally {
-        partitionMapLock.unlock()
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/utils/Os.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala
new file mode 100644
index 0000000..6574f08
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Os.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+object Os {
+  val name = System.getProperty("os.name").toLowerCase
+  val isWindows = name.startsWith("windows")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 405c7ae..c9ca95f 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.nio._
 import charset.Charset
 import java.nio.channels._
+import java.util.concurrent.locks.Lock
 import java.lang.management._
 import javax.management._
 import scala.collection._
@@ -587,4 +588,16 @@ object Utils extends Logging {
     (bytes(offset + 3) & 0xFF)
   }
   
+  /**
+   * Execute the given function inside the lock
+   */
+  def inLock[T](lock: Lock)(fun: => T): T = {
+    lock.lock()
+    try {
+       return fun
+    } finally {
+      lock.unlock()
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d1c4b3d..856d136 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -35,6 +35,7 @@ import kafka.controller.KafkaController
 import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
+import kafka.utils.Utils.inLock
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -781,8 +782,7 @@ class LeaderExistsOrChangedListener(topic: String,
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
-    leaderLock.lock()
-    try {
+    inLock(leaderLock) {
       if(t == topic && p == partition){
         if(oldLeaderOpt == None){
           trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic,
partition))
@@ -797,18 +797,12 @@ class LeaderExistsOrChangedListener(topic: String,
         }
       }
     }
-    finally {
-      leaderLock.unlock()
-    }
   }
 
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
-    leaderLock.lock()
-    try {
+    inLock(leaderLock) {
       leaderExistsOrChanged.signal()
-    }finally {
-      leaderLock.unlock()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98b6de1/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index ce1679a..920f318 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -18,12 +18,14 @@
 package kafka.utils
 
 import java.util.Arrays
+import java.util.concurrent.locks.ReentrantLock
 import java.nio.ByteBuffer
 import java.io._
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
 import kafka.common.KafkaException
+import kafka.utils.Utils.inLock
 import org.junit.Test
 
 
@@ -94,4 +96,16 @@ class UtilsTest extends JUnitSuite {
     assertTrue(emptyStringList.equals(emptyListFromNullString))
     assertTrue(emptyStringList.equals(emptyList))
   }
+  
+  @Test
+  def testInLock() {
+    val lock = new ReentrantLock()
+    val result = inLock(lock) {
+      assertTrue("Should be in lock", lock.isHeldByCurrentThread)
+      1 + 1
+    }
+    assertEquals(2, result)
+    assertFalse("Should be unlocked", lock.isLocked)
+    
+  }
 }


Mime
View raw message