kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
Date Thu, 15 Feb 2018 20:22:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366212#comment-16366212
] 

ASF GitHub Bot commented on KAFKA-6517:
---------------------------------------

rajinisivaram closed pull request #4551: KAFKA-6517: Avoid deadlock in ZooKeeperClient during
session expiry
URL: https://github.com/apache/kafka/pull/4551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 9a1d16274df..3934fd0ad5d 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
+import kafka.utils.{KafkaScheduler, Logging}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback,
StatCallback, StringCallback, VoidCallback}
 import org.apache.zookeeper.KeeperException.Code
@@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String,
 
   metricNames += "SessionState"
 
+  expiryScheduler.startup()
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
@@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String,
    * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most
specific common supertype
    * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
    */
-  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock)
{
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
     if (requests.isEmpty)
       Seq.empty
     else {
@@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String,
       requests.foreach { request =>
         inFlightRequests.acquire()
         try {
-          send(request) { response =>
-            responseQueue.add(response)
-            inFlightRequests.release()
-            countDownLatch.countDown()
+          inReadLock(initializationLock) {
+            send(request) { response =>
+              responseQueue.add(response)
+              inFlightRequests.release()
+              countDownLatch.countDown()
+            }
           }
         } catch {
           case e: Throwable =>
@@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String,
     }
   }
 
-  private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response =>
Unit): Unit = {
+  // Visibility to override for testing
+  private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response
=> Unit): Unit = {
     // Safe to cast as we always create a response of the right type
     def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
 
@@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String,
     stateChangeHandlers.clear()
     zooKeeper.close()
     metricNames.foreach(removeMetric(_))
+    expiryScheduler.shutdown()
     info("Closed.")
   }
 
   def sessionId: Long = inReadLock(initializationLock) {
     zooKeeper.getSessionId
   }
+
+  // Only for testing
+  private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
+    zooKeeper
+  }
   
   private def initialize(): Unit = {
     if (!connectionState.isAlive) {
@@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String,
             error("Auth failed.")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
           } else if (state == KeeperState.Expired) {
-            inWriteLock(initializationLock) {
-              info("Session expired.")
-              stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-              initialize()
-              stateChangeHandlers.values.foreach(_.afterInitializingSession())
-            }
+            expiryScheduler.schedule("zk-session-expired", () => {
+              inWriteLock(initializationLock) {
+                info("Session expired.")
+                stateChangeHandlers.values.foreach(_.beforeInitializingSession())
+                initialize()
+                stateChangeHandlers.values.foreach(_.afterInitializingSession())
+              }
+            }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
           }
         case Some(path) =>
           (event.getType: @unchecked) match {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index c8ebaa90735..f1c09d7308d 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,7 @@ package kafka.zookeeper
 import java.nio.charset.StandardCharsets
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors,
Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -385,6 +385,114 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     } finally zooKeeperClient.close()
   }
 
+  /**
+    * Tests that if session expiry notification is received while a thread is processing
requests,
+    * session expiry is handled and the request thread completes with responses to all requests,
+    * even though some requests may fail due to session expiry or disconnection.
+    *
+    * Sequence of events on different threads:
+    *   Request thread:
+    *       - Sends `maxInflightRequests` requests (these may complete before session is
expired)
+    *   Main thread:
+    *       - Waits for at least one request to be processed (this should succeed)
+    *       - Expires session by creating new client with same session id
+    *       - Unblocks another `maxInflightRequests` requests before and after new client
is closed (these may fail)
+    *   ZooKeeperClient Event thread:
+    *       - Delivers responses and session expiry (no ordering guarantee between these,
both are processed asynchronously)
+    *   Response executor thread:
+    *       - Blocks subsequent sends by delaying response until session expiry is processed
+    *   ZooKeeperClient Session Expiry Handler:
+    *       - Unblocks subsequent sends
+    *   Main thread:
+    *       - Waits for all sends to complete. The requests sent after session expiry processing
should succeed.
+    */
+  @Test
+  def testSessionExpiry(): Unit = {
+    val maxInflightRequests = 2
+    val responseExecutor = Executors.newSingleThreadExecutor
+    val sendSemaphore = new Semaphore(0)
+    val sendCompleteSemaphore = new Semaphore(0)
+    val sendSize = maxInflightRequests * 5
+    @volatile var resultCodes: Seq[Code] = null
+    val stateChanges = new ConcurrentLinkedQueue[String]()
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
maxInflightRequests,
+      time, "testGroupType", "testGroupName") {
+      override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response
=> Unit): Unit = {
+        super.send(request)( response => {
+          responseExecutor.submit(new Runnable {
+            override def run(): Unit = {
+              sendCompleteSemaphore.release()
+              sendSemaphore.acquire()
+              processResponse(response)
+            }
+          })
+        })
+      }
+    }
+    try {
+      zooKeeperClient.registerStateChangeHandler(new StateChangeHandler {
+        override val name: String ="test-state-change-handler"
+        override def afterInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("afterInitializingSession")
+        }
+        override def beforeInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("beforeInitializingSession")
+          sendSemaphore.release(sendSize) // Resume remaining sends
+        }
+        private def verifyHandlerThread(): Unit = {
+          val threadName = Thread.currentThread.getName
+          assertTrue(s"Unexpected thread + $threadName", threadName.startsWith(zooKeeperClient.expiryScheduler.threadNamePrefix))
+        }
+      })
+
+      val requestThread = new Thread {
+        override def run(): Unit = {
+          val requests = (1 to sendSize).map(i => GetDataRequest(s"/$i"))
+          resultCodes = zooKeeperClient.handleRequests(requests).map(_.resultCode)
+        }
+      }
+      requestThread.start()
+      sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests
+
+      // Trigger session expiry by reusing the session id in another client
+      val dummyWatcher = new Watcher {
+        override def process(event: WatchedEvent): Unit = {}
+      }
+      val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+        zooKeeperClient.currentZooKeeper.getSessionId,
+        zooKeeperClient.currentZooKeeper.getSessionPasswd)
+      assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
+      anotherZkClient.close()
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
+
+      requestThread.join(10000)
+      if (requestThread.isAlive) {
+        requestThread.interrupt()
+        fail("Request thread did not complete")
+      }
+      assertEquals(Seq("beforeInitializingSession", "afterInitializingSession"), stateChanges.asScala.toSeq)
+
+      assertEquals(resultCodes.size, sendSize)
+      val connectionLostCount = resultCodes.count(_ == Code.CONNECTIONLOSS)
+      assertTrue(s"Unexpected connection lost requests $resultCodes", connectionLostCount
<= maxInflightRequests)
+      val expiredCount = resultCodes.count(_ == Code.SESSIONEXPIRED)
+      assertTrue(s"Unexpected session expired requests $resultCodes", expiredCount <=
maxInflightRequests)
+      assertTrue(s"No connection lost or expired requests $resultCodes", connectionLostCount
+ expiredCount > 0)
+      assertEquals(Code.NONODE, resultCodes.head)
+      assertEquals(Code.NONODE, resultCodes.last)
+      assertTrue(s"Unexpected result code $resultCodes",
+        resultCodes.filterNot(Set(Code.NONODE, Code.SESSIONEXPIRED, Code.CONNECTIONLOSS).contains).isEmpty)
+
+    } finally {
+      zooKeeperClient.close()
+      responseExecutor.shutdownNow()
+    }
+    assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
+  }
+
   def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
     metricName.getName == name && metricName.getGroup == "testMetricGroup" &&
metricName.getType == "testMetricType"
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6517
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6517
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> Stack traces from a local test run that was deadlocked because shutdown couldn't acquire
the lock:
>  # kafka-scheduler-7: acquired read lock in kafka.zookeeper.ZooKeeperClient.handleRequests
>  # Test worker-EventThread waiting for write lock to process SessionExpired in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
>  # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 2) waiting
to acquire read lock for kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler
> Stack traces of the relevant threads:
> {quote}
> "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting on condition
[0x000070000b371000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007e4c6e698> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
>         at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
>         at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
>         at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
>         at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
>         at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
>         at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
>         at kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
>         at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
>         at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
>         at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
> ......
> "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 waiting on
condition [0x000070000a23f000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
>         at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>         at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
>  
> "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 waiting on
condition [0x0000700011eaf000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
>         at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295)
>         at kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217)
>         at kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68)
>         at kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181)
>         at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message