kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4428; Kafka does not exit if port is already bound
Date Wed, 28 Dec 2016 18:13:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 88a439ed8 -> dc550259e


KAFKA-4428; Kafka does not exit if port is already bound

During Acceptor initialization, if "Address already in use" error is encountered,
the shutdown latch in each Processor is never counted down. Consequently,
the Kafka server hangs when `Processor.shutdown` is called.

Author: huxi <huxi@zhenrongbao.com>
Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2156 from amethystic/kafka-4428_Kafka_noexit_for_port_already_use


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc550259
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc550259
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc550259

Branch: refs/heads/trunk
Commit: dc550259ea04011b22501c82dd6510a29b5a9b0f
Parents: 88a439e
Author: huxi <huxi@zhenrongbao.com>
Authored: Wed Dec 28 13:13:17 2016 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Dec 28 17:34:31 2016 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 21 ++++++++++------
 .../unit/kafka/server/ServerStartupTest.scala   | 26 +++++++++++++++++---
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc550259/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 55061ed..95c5fdf 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -166,10 +166,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
 private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends
Runnable with Logging {
 
   private val startupLatch = new CountDownLatch(1)
-  private val shutdownLatch = new CountDownLatch(1)
+
+  // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception
is thrown in the constructor
+  // (e.g. if the address is already in use). We want `shutdown` to proceed in such cases,
so we first assign an open
+  // latch and then replace it in `startupComplete()`.
+  @volatile private var shutdownLatch = new CountDownLatch(0)
+
   private val alive = new AtomicBoolean(true)
 
-  def wakeup()
+  def wakeup(): Unit
 
   /**
    * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
@@ -188,24 +193,26 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas:
ConnectionQ
   /**
    * Record that the thread startup is complete
    */
-  protected def startupComplete() = {
+  protected def startupComplete(): Unit = {
+    // Replace the open latch with a closed one
+    shutdownLatch = new CountDownLatch(1)
     startupLatch.countDown()
   }
 
   /**
    * Record that the thread shutdown is complete
    */
-  protected def shutdownComplete() = shutdownLatch.countDown()
+  protected def shutdownComplete(): Unit = shutdownLatch.countDown()
 
   /**
    * Is the server still running?
    */
-  protected def isRunning = alive.get
+  protected def isRunning: Boolean = alive.get
 
   /**
    * Close the connection identified by `connectionId` and decrement the connection count.
    */
-  def close(selector: KSelector, connectionId: String) {
+  def close(selector: KSelector, connectionId: String): Unit = {
     val channel = selector.channel(connectionId)
     if (channel != null) {
       debug(s"Closing selector connection $connectionId")
@@ -219,7 +226,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
   /**
    * Close `channel` and decrement the connection count.
    */
-  def close(channel: SocketChannel) {
+  def close(channel: SocketChannel): Unit = {
     if (channel != null) {
       debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
       connectionQuotas.dec(channel.socket.getInetAddress)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc550259/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 86b167e..92c6a9b 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,9 +17,7 @@
 
 package kafka.server
 
-import kafka.utils.ZkUtils
-import kafka.utils.CoreUtils
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -44,6 +42,28 @@ class ServerStartupTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testConflictBrokerStartupWithSamePort {
+    // Create and start first broker
+    val brokerId1 = 0
+    val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
+    val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
+    val port = server1.boundPort()
+
+    // Create a second broker with same port
+    val brokerId2 = 1
+    val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, port = port)
+    try {
+      TestUtils.createServer(KafkaConfig.fromProps(props2))
+      fail("Starting a broker with the same port should fail")
+    } catch {
+      case _: RuntimeException => // expected
+    } finally {
+      server1.shutdown()
+      CoreUtils.delete(server1.config.logDirs)
+    }
+  }
+
+  @Test
   def testConflictBrokerRegistration {
     // Try starting a broker with the a conflicting broker id.
     // This shouldn't affect the existing broker registration.


Mime
View raw message