From commits-return-9392-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Apr 18 11:20:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B2EC718064E for ; Wed, 18 Apr 2018 11:20:43 +0200 (CEST) Received: (qmail 26800 invoked by uid 500); 18 Apr 2018 09:20:42 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 26791 invoked by uid 99); 18 Apr 2018 09:20:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2018 09:20:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B62AF82F78; Wed, 18 Apr 2018 09:20:41 +0000 (UTC) Date: Wed, 18 Apr 2018 09:20:41 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6772: Load credentials from ZK before accepting connections (#4867) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152404324118.1363.15498776836281970318@gitbox.apache.org> From: rsivaram@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: cae42215b7597cc39afc682cb0399782cf42fe23 X-Git-Newrev: 98bb75a58fc241b400ed7535d9e9723f678b4803 X-Git-Rev: 98bb75a58fc241b400ed7535d9e9723f678b4803 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 98bb75a KAFKA-6772: Load credentials from ZK before accepting connections (#4867) 98bb75a is described below commit 98bb75a58fc241b400ed7535d9e9723f678b4803 Author: Rajini Sivaram AuthorDate: Wed Apr 18 10:20:28 2018 +0100 KAFKA-6772: Load credentials from ZK before accepting connections (#4867) Start processing client connections only after completing KafkaServer initialization to ensure that credentials are loaded from ZK into cache before authentications are processed. Acceptors are started earlier so that bound port is known for registering in ZK. Reviewers: Manikumar Reddy , Jun Rao , Ismael Juma --- .../main/scala/kafka/network/SocketServer.scala | 47 ++++++++++++-- core/src/main/scala/kafka/server/KafkaServer.scala | 7 ++- .../kafka/server/ScramServerStartupTest.scala | 73 ++++++++++++++++++++++ 3 files changed, 120 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d2ee084..27d89b4 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -76,12 +76,24 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private var stoppedProcessingRequests = false /** - * Start the socket server + * Start the socket server. Acceptors for all the listeners are started. Processors + * are started if `startupProcessors` is true. If not, processors are only started when + * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors + * is used to delay processing client connections until server is fully initialized, e.g. + * to ensure that all credentials have been loaded before authentications are performed. + * Acceptors are always started during `startup` so that the bound port is known when this + * method completes even when ephemeral ports are used. Incoming connections on this server + * are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. + * + * @param startupProcessors Flag indicating whether `Processor`s must be started. */ - def startup() { + def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) + if (startupProcessors) { + startProcessors() + } } newGauge("NetworkProcessorAvgIdlePercent", @@ -110,6 +122,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Started " + acceptors.size + " acceptor threads") } + /** + * Starts processors of all the acceptors of this server if they have not already been started. + * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]] + * was invoked with `startupProcessors=false`. + */ + def startProcessors(): Unit = synchronized { + acceptors.values.asScala.foreach { _.startProcessors() } + info(s"Started processors for ${acceptors.size} acceptors") + } + private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap private def createAcceptorAndProcessors(processorsPerListener: Int, @@ -196,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized { info(s"Adding listeners for endpoints $listenersAdded") createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded) + startProcessors() } def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { @@ -307,13 +330,25 @@ private[kafka] class Acceptor(val endPoint: EndPoint, private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() + private val processorsStarted = new AtomicBoolean private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized { - newProcessors.foreach { processor => + processors ++= newProcessors + if (processorsStarted.get) + startProcessors(newProcessors) + } + + private[network] def startProcessors(): Unit = synchronized { + if (!processorsStarted.getAndSet(true)) { + startProcessors(processors) + } + } + + private def startProcessors(processors: Seq[Processor]): Unit = synchronized { + processors.foreach { processor => KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start() } - processors ++= newProcessors } private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized { @@ -328,7 +363,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint, override def shutdown(): Unit = { super.shutdown() - processors.foreach(_.shutdown()) + synchronized { + processors.foreach(_.shutdown()) + } } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a0d2c79..c729c8c 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -243,8 +243,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + // Create and start the socket server acceptor threads so that the bound port is known. + // Delay starting processors until the end of the initialization sequence to ensure + // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider) - socketServer.startup() + socketServer.startup(startupProcessors = false) /* start replica manager */ replicaManager = createReplicaManager(isShuttingDown) @@ -310,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() - + socketServer.startProcessors() brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala new file mode 100644 index 0000000..18b4f8e --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server +import java.util.Collections + +import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup} +import kafka.utils._ +import kafka.zk.ConfigEntityChangeNotificationZNode +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ + +/** + * Tests that there are no failed authentications during broker startup. This is to verify + * that SCRAM credentials are loaded by brokers before client connections can be made. + * For simplicity of testing, this test verifies authentications of controller connections. + */ +class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup { + + override val producerCount = 0 + override val consumerCount = 0 + override val serverCount = 1 + + private val kafkaClientSaslMechanism = "SCRAM-SHA-256" + private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala + + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + + override def configureSecurityBeforeServersStart() { + super.configureSecurityBeforeServersStart() + zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) + // Create credentials before starting brokers + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + + startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl)) + } + + @Test + def testAuthentications(): Unit = { + val successfulAuths = totalAuthentications("successful-authentication-total") + assertTrue("No successful authentications", successfulAuths > 0) + val failedAuths = totalAuthentications("failed-authentication-total") + assertEquals(0, failedAuths) + } + + private def totalAuthentications(metricName: String): Int = { + val allMetrics = servers.head.metrics.metrics + val totalAuthCount = allMetrics.values().asScala.filter(_.metricName().name() == metricName) + .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double]) + totalAuthCount.toInt + } +} -- To stop receiving notification emails like this one, please contact rsivaram@apache.org.