kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry (#4551)
Date Thu, 15 Feb 2018 20:47:11 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new a2d554f  KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry (#4551)
a2d554f is described below

commit a2d554f14d9095b6c50a119b13c4d53b4d6089b4
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Thu Feb 15 20:21:12 2018 +0000

    KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry (#4551)
    
    ZooKeeperClient acquires initializationLock#writeLock to establish a new connection while
processing session expiry WatchEvent. ZooKeeperClient#handleRequests acquires initializationLock#readLock,
allowing multiple batches of requests to be processed concurrently, but preventing reconnections
while processing requests. At the moment, handleRequests holds onto the readLock throughout
the method, even while waiting for responses and inflight requests to complete. But responses
cannot be [...]
    
    This commit reduces locking in ZooKeeperClient#handleRequests to just the non-blocking
send, so that session expiry handling doesn't get blocked when a send is blocked waiting for
responses. Also moves session expiry handling to a separate thread so that Kafka controller
doesn't block the event handler thread when processing session expiry.
---
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  39 ++++---
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 114 ++++++++++++++++++++-
 2 files changed, 137 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 9a1d162..3934fd0 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
+import kafka.utils.{KafkaScheduler, Logging}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback,
StatCallback, StringCallback, VoidCallback}
 import org.apache.zookeeper.KeeperException.Code
@@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String,
 
   metricNames += "SessionState"
 
+  expiryScheduler.startup()
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
@@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String,
    * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most
specific common supertype
    * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
    */
-  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock)
{
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
     if (requests.isEmpty)
       Seq.empty
     else {
@@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String,
       requests.foreach { request =>
         inFlightRequests.acquire()
         try {
-          send(request) { response =>
-            responseQueue.add(response)
-            inFlightRequests.release()
-            countDownLatch.countDown()
+          inReadLock(initializationLock) {
+            send(request) { response =>
+              responseQueue.add(response)
+              inFlightRequests.release()
+              countDownLatch.countDown()
+            }
           }
         } catch {
           case e: Throwable =>
@@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String,
     }
   }
 
-  private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response =>
Unit): Unit = {
+  // Visibility to override for testing
+  private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response
=> Unit): Unit = {
     // Safe to cast as we always create a response of the right type
     def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
 
@@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String,
     stateChangeHandlers.clear()
     zooKeeper.close()
     metricNames.foreach(removeMetric(_))
+    expiryScheduler.shutdown()
     info("Closed.")
   }
 
   def sessionId: Long = inReadLock(initializationLock) {
     zooKeeper.getSessionId
   }
+
+  // Only for testing
+  private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
+    zooKeeper
+  }
   
   private def initialize(): Unit = {
     if (!connectionState.isAlive) {
@@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String,
             error("Auth failed.")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
           } else if (state == KeeperState.Expired) {
-            inWriteLock(initializationLock) {
-              info("Session expired.")
-              stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-              initialize()
-              stateChangeHandlers.values.foreach(_.afterInitializingSession())
-            }
+            expiryScheduler.schedule("zk-session-expired", () => {
+              inWriteLock(initializationLock) {
+                info("Session expired.")
+                stateChangeHandlers.values.foreach(_.beforeInitializingSession())
+                initialize()
+                stateChangeHandlers.values.foreach(_.afterInitializingSession())
+              }
+            }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
           }
         case Some(path) =>
           (event.getType: @unchecked) match {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index c8ebaa9..f1c09d7 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,7 @@ package kafka.zookeeper
 import java.nio.charset.StandardCharsets
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors,
Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -385,6 +385,114 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     } finally zooKeeperClient.close()
   }
 
+  /**
+    * Tests that if session expiry notification is received while a thread is processing
requests,
+    * session expiry is handled and the request thread completes with responses to all requests,
+    * even though some requests may fail due to session expiry or disconnection.
+    *
+    * Sequence of events on different threads:
+    *   Request thread:
+    *       - Sends `maxInflightRequests` requests (these may complete before session is
expired)
+    *   Main thread:
+    *       - Waits for at least one request to be processed (this should succeed)
+    *       - Expires session by creating new client with same session id
+    *       - Unblocks another `maxInflightRequests` requests before and after new client
is closed (these may fail)
+    *   ZooKeeperClient Event thread:
+    *       - Delivers responses and session expiry (no ordering guarantee between these,
both are processed asynchronously)
+    *   Response executor thread:
+    *       - Blocks subsequent sends by delaying response until session expiry is processed
+    *   ZooKeeperClient Session Expiry Handler:
+    *       - Unblocks subsequent sends
+    *   Main thread:
+    *       - Waits for all sends to complete. The requests sent after session expiry processing
should succeed.
+    */
+  @Test
+  def testSessionExpiry(): Unit = {
+    val maxInflightRequests = 2
+    val responseExecutor = Executors.newSingleThreadExecutor
+    val sendSemaphore = new Semaphore(0)
+    val sendCompleteSemaphore = new Semaphore(0)
+    val sendSize = maxInflightRequests * 5
+    @volatile var resultCodes: Seq[Code] = null
+    val stateChanges = new ConcurrentLinkedQueue[String]()
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
maxInflightRequests,
+      time, "testGroupType", "testGroupName") {
+      override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response
=> Unit): Unit = {
+        super.send(request)( response => {
+          responseExecutor.submit(new Runnable {
+            override def run(): Unit = {
+              sendCompleteSemaphore.release()
+              sendSemaphore.acquire()
+              processResponse(response)
+            }
+          })
+        })
+      }
+    }
+    try {
+      zooKeeperClient.registerStateChangeHandler(new StateChangeHandler {
+        override val name: String ="test-state-change-handler"
+        override def afterInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("afterInitializingSession")
+        }
+        override def beforeInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("beforeInitializingSession")
+          sendSemaphore.release(sendSize) // Resume remaining sends
+        }
+        private def verifyHandlerThread(): Unit = {
+          val threadName = Thread.currentThread.getName
+          assertTrue(s"Unexpected thread + $threadName", threadName.startsWith(zooKeeperClient.expiryScheduler.threadNamePrefix))
+        }
+      })
+
+      val requestThread = new Thread {
+        override def run(): Unit = {
+          val requests = (1 to sendSize).map(i => GetDataRequest(s"/$i"))
+          resultCodes = zooKeeperClient.handleRequests(requests).map(_.resultCode)
+        }
+      }
+      requestThread.start()
+      sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests
+
+      // Trigger session expiry by reusing the session id in another client
+      val dummyWatcher = new Watcher {
+        override def process(event: WatchedEvent): Unit = {}
+      }
+      val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+        zooKeeperClient.currentZooKeeper.getSessionId,
+        zooKeeperClient.currentZooKeeper.getSessionPasswd)
+      assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
+      anotherZkClient.close()
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
+
+      requestThread.join(10000)
+      if (requestThread.isAlive) {
+        requestThread.interrupt()
+        fail("Request thread did not complete")
+      }
+      assertEquals(Seq("beforeInitializingSession", "afterInitializingSession"), stateChanges.asScala.toSeq)
+
+      assertEquals(resultCodes.size, sendSize)
+      val connectionLostCount = resultCodes.count(_ == Code.CONNECTIONLOSS)
+      assertTrue(s"Unexpected connection lost requests $resultCodes", connectionLostCount
<= maxInflightRequests)
+      val expiredCount = resultCodes.count(_ == Code.SESSIONEXPIRED)
+      assertTrue(s"Unexpected session expired requests $resultCodes", expiredCount <=
maxInflightRequests)
+      assertTrue(s"No connection lost or expired requests $resultCodes", connectionLostCount
+ expiredCount > 0)
+      assertEquals(Code.NONODE, resultCodes.head)
+      assertEquals(Code.NONODE, resultCodes.last)
+      assertTrue(s"Unexpected result code $resultCodes",
+        resultCodes.filterNot(Set(Code.NONODE, Code.SESSIONEXPIRED, Code.CONNECTIONLOSS).contains).isEmpty)
+
+    } finally {
+      zooKeeperClient.close()
+      responseExecutor.shutdownNow()
+    }
+    assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)
+  }
+
   def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
     metricName.getName == name && metricName.getGroup == "testMetricGroup" &&
metricName.getType == "testMetricType"
 

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message