kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6772: Load credentials from ZK before accepting connections (#4867)
Date Wed, 18 Apr 2018 09:20:41 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98bb75a  KAFKA-6772: Load credentials from ZK before accepting connections (#4867)
98bb75a is described below

commit 98bb75a58fc241b400ed7535d9e9723f678b4803
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Apr 18 10:20:28 2018 +0100

    KAFKA-6772: Load credentials from ZK before accepting connections (#4867)
    
    Start processing client connections only after completing KafkaServer initialization to
ensure that credentials are loaded from ZK into cache before authentications are processed.
Acceptors are started earlier so that bound port is known for registering in ZK.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>,
Ismael Juma <ismael@juma.me.uk>
---
 .../main/scala/kafka/network/SocketServer.scala    | 47 ++++++++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  7 ++-
 .../kafka/server/ScramServerStartupTest.scala      | 73 ++++++++++++++++++++++
 3 files changed, 120 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d2ee084..27d89b4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -76,12 +76,24 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
   private var stoppedProcessingRequests = false
 
   /**
-   * Start the socket server
+   * Start the socket server. Acceptors for all the listeners are started. Processors
+   * are started if `startupProcessors` is true. If not, processors are only started when
+   * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors
+   * is used to delay processing client connections until server is fully initialized, e.g.
+   * to ensure that all credentials have been loaded before authentications are performed.
+   * Acceptors are always started during `startup` so that the bound port is known when this
+   * method completes even when ephemeral ports are used. Incoming connections on this server
+   * are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
+   *
+   * @param startupProcessors Flag indicating whether `Processor`s must be started.
    */
-  def startup() {
+  def startup(startupProcessors: Boolean = true) {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
       createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+      if (startupProcessors) {
+        startProcessors()
+      }
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -110,6 +122,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
     info("Started " + acceptors.size + " acceptor threads")
   }
 
+  /**
+   * Starts processors of all the acceptors of this server if they have not already been
started.
+   * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startProcessors(): Unit = synchronized {
+    acceptors.values.asScala.foreach { _.startProcessors() }
+    info(s"Started processors for ${acceptors.size} acceptors")
+  }
+
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   private def createAcceptorAndProcessors(processorsPerListener: Int,
@@ -196,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
     info(s"Adding listeners for endpoints $listenersAdded")
     createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
+    startProcessors()
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
@@ -307,13 +330,25 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
+  private val processorsStarted = new AtomicBoolean
 
   private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized
{
-    newProcessors.foreach { processor =>
+    processors ++= newProcessors
+    if (processorsStarted.get)
+      startProcessors(newProcessors)
+  }
+
+  private[network] def startProcessors(): Unit = synchronized {
+    if (!processorsStarted.getAndSet(true)) {
+      startProcessors(processors)
+    }
+  }
+
+  private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
+    processors.foreach { processor =>
       KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
-    processors ++= newProcessors
   }
 
   private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel):
Unit = synchronized {
@@ -328,7 +363,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   override def shutdown(): Unit = {
     super.shutdown()
-    processors.foreach(_.shutdown())
+    synchronized {
+      processors.foreach(_.shutdown())
+    }
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0d2c79..c729c8c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -243,8 +243,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        // Create and start the socket server acceptor threads so that the bound port is
known.
+        // Delay starting processors until the end of the initialization sequence to ensure
+        // that credentials have been loaded before processing authentications.
         socketServer = new SocketServer(config, metrics, time, credentialProvider)
-        socketServer.startup()
+        socketServer.startup(startupProcessors = false)
 
         /* start replica manager */
         replicaManager = createReplicaManager(isShuttingDown)
@@ -310,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-
+        socketServer.startProcessors()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
new file mode 100644
index 0000000..18b4f8e
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+import java.util.Collections
+
+import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
+import kafka.utils._
+import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that there are no failed authentications during broker startup. This is to verify
+ * that SCRAM credentials are loaded by brokers before client connections can be made.
+ * For simplicity of testing, this test verifies authentications of controller connections.
+ */
+class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
+
+  override val producerCount = 0
+  override val consumerCount = 0
+  override val serverCount = 1
+
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala
+
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms,
kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    // Create credentials before starting brokers
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl))
+  }
+
+  @Test
+  def testAuthentications(): Unit = {
+    val successfulAuths = totalAuthentications("successful-authentication-total")
+    assertTrue("No successful authentications", successfulAuths > 0)
+    val failedAuths = totalAuthentications("failed-authentication-total")
+    assertEquals(0, failedAuths)
+  }
+
+  private def totalAuthentications(metricName: String): Int = {
+    val allMetrics = servers.head.metrics.metrics
+    val totalAuthCount = allMetrics.values().asScala.filter(_.metricName().name() == metricName)
+      .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double])
+    totalAuthCount.toInt
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message