Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D628200C68 for ; Wed, 3 May 2017 22:09:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2BF28160BB5; Wed, 3 May 2017 20:09:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A2DC3160BA1 for ; Wed, 3 May 2017 22:09:01 +0200 (CEST) Received: (qmail 51330 invoked by uid 500); 3 May 2017 20:09:00 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 51321 invoked by uid 99); 3 May 2017 20:09:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 May 2017 20:09:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B1D31DF9A3; Wed, 3 May 2017 20:09:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rsivaram@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4703: Test with two SASL_SSL listeners with different JAAS contexts Date: Wed, 3 May 2017 20:09:00 +0000 (UTC) archived-at: Wed, 03 May 2017 20:09:03 -0000 Repository: kafka Updated Branches: refs/heads/trunk 8d7492016 -> a7671c7f3 KAFKA-4703: Test with two SASL_SSL listeners with different JAAS contexts Tests broker with multiple SASL mechanisms with different endpoints for different mechanisms. Each endpoint uses its own JAAS context. Author: Balint Molnar Reviewers: Rajini Sivaram, Ismael Juma Closes #2506 from baluchicken/KAFKA-4703 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7671c7f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7671c7f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7671c7f Branch: refs/heads/trunk Commit: a7671c7f3723113e716b99471e7be3499fde1b15 Parents: 8d74920 Author: Balint Molnar Authored: Wed May 3 21:08:30 2017 +0100 Committer: Rajini Sivaram Committed: Wed May 3 21:08:30 2017 +0100 ---------------------------------------------------------------------- .../api/SaslEndToEndAuthorizationTest.scala | 2 +- .../scala/integration/kafka/api/SaslSetup.scala | 47 ++++-- ...ListenersWithAdditionalJaasContextTest.scala | 47 ++++++ ...pleListenersWithDefaultJaasContextTest.scala | 37 +++++ ...tenersWithSameSecurityProtocolBaseTest.scala | 158 +++++++++++++++++++ ...eListenersWithSameSecurityProtocolTest.scala | 132 ---------------- .../security/auth/ZkAuthorizationTest.scala | 2 +- .../scala/unit/kafka/utils/JaasTestUtils.scala | 32 +--- .../scala/unit/kafka/zk/ZKEphemeralTest.scala | 2 +- 9 files changed, 283 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index dd91627..d4c417c 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.errors.GroupAuthorizationException -import org.junit.{Before,Test} +import org.junit.{Before, Test} import scala.collection.immutable.List import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 29aea61..13ed2e2 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -20,8 +20,10 @@ package kafka.api import java.io.File import java.util.Properties import javax.security.auth.login.Configuration + import kafka.security.minikdc.MiniKdc import kafka.server.KafkaConfig +import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.authenticator.LoginManager @@ -29,12 +31,13 @@ import org.apache.kafka.common.config.SaslConfigs /* * Implements an enumeration for the modes enabled here: - * zk only, kafka only, both. + * zk only, kafka only, both, custom KafkaServer. */ sealed trait SaslSetupMode case object ZkSasl extends SaslSetupMode case object KafkaSasl extends SaslSetupMode case object Both extends SaslSetupMode +case object CustomKafkaServerSasl extends SaslSetupMode /* * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files. @@ -43,11 +46,13 @@ trait SaslSetup { private val workDir = TestUtils.tempDir() private val kdcConf = MiniKdc.createConfig private var kdc: MiniKdc = null - private var serverKeytabFile: Option[File] = null - private var clientKeytabFile: Option[File] = null + private var serverKeytabFile: Option[File] = None + private var clientKeytabFile: Option[File] = None + private var jaasContext: Seq[JaasSection] = Seq() def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], - mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName) { + mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName, + withDefaultJaasContext: Boolean = true) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) @@ -60,27 +65,39 @@ trait SaslSetup { kdc.start() kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) - } else { - this.clientKeytabFile = None - this.serverKeytabFile = None } - setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) + if (withDefaultJaasContext) { + setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) + writeJaasConfigurationToFile() + } else + setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { - val jaasFile = mode match { - case ZkSasl => JaasTestUtils.writeZkFile() - case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, kafkaServerSaslMechanisms, - kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) - case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, kafkaServerSaslMechanisms, - kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + val jaasSection = mode match { + case ZkSasl => JaasTestUtils.zkSections + case KafkaSasl => + Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile), + JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) + case CustomKafkaServerSasl => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, + kafkaServerSaslMechanisms, serverKeytabFile)) + case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile), + JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) ++ JaasTestUtils.zkSections } + jaasContext = jaasContext ++ jaasSection + } + + protected def writeJaasConfigurationToFile() { // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile) + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, JaasTestUtils.writeJaasContextsToFile(jaasContext)) + } + + protected def removeJaasSection(context: String) { + jaasContext = jaasContext.filter(_.contextName != context) } def closeSasl() { http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala new file mode 100644 index 0000000..3251be0 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Properties + +import kafka.api.CustomKafkaServerSasl +import org.apache.kafka.common.network.ListenerName + + +class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest{ + + import MultipleListenersWithSameSecurityProtocolBaseTest._ + + override def setSaslProperties(listenerName: ListenerName): Option[Properties] = { + + val gssapiSaslProperties = kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true) + val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true) + + listenerName.value match { + case SecureInternal => Some(plainSaslProperties) + case SecureExternal => Some(gssapiSaslProperties) + case _ => None + } + } + + override def addJaasSection(): Unit = { + setJaasConfiguration(CustomKafkaServerSasl, "secure_external.KafkaServer", List(GssApi), None) + setJaasConfiguration(CustomKafkaServerSasl, "secure_internal.KafkaServer", List(Plain), None) + removeJaasSection("KafkaServer") + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala new file mode 100644 index 0000000..8291d82 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Properties + +import org.apache.kafka.common.network.ListenerName + + +class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest { + + import MultipleListenersWithSameSecurityProtocolBaseTest._ + + override def setSaslProperties(listenerName: ListenerName): Option[Properties] = { + val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true) + + listenerName.value match { + case SecureExternal | SecureInternal => Some(plainSaslProperties) + case _ => None + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala new file mode 100644 index 0000000..9765279 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.File +import java.util.{Collections, Properties} +import java.util.concurrent.TimeUnit + +import kafka.api.{Both, SaslSetup} +import kafka.common.Topic +import kafka.coordinator.group.OffsetConfig +import kafka.utils.{CoreUtils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.Assert.assertEquals +import org.junit.{After, Before, Test} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ + +object MultipleListenersWithSameSecurityProtocolBaseTest { + val SecureInternal = "SECURE_INTERNAL" + val SecureExternal = "SECURE_EXTERNAL" + val Internal = "INTERNAL" + val External = "EXTERNAL" + val GssApi = "GSSAPI" + val Plain = "PLAIN" +} + +abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup{ + + import MultipleListenersWithSameSecurityProtocolBaseTest._ + + private val trustStoreFile = File.createTempFile("truststore", ".jks") + private val servers = new ArrayBuffer[KafkaServer] + private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]() + private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]() + private val kafkaClientSaslMechanism = Plain + private val kafkaServerSaslMechanisms = List(GssApi, Plain) + + protected def setSaslProperties(listenerName: ListenerName): Option[Properties] + protected def addJaasSection(): Unit = {} + + @Before + override def setUp(): Unit = { + startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, withDefaultJaasContext = false) + addJaasSection() + writeJaasConfigurationToFile() + super.setUp() + // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest + val numServers = 2 + + (0 until numServers).foreach { brokerId => + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile)) + // Ensure that we can support multiple listeners per security protocol and multiple security protocols + props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $Internal://localhost:0, " + + s"$SecureExternal://localhost:0, $External://localhost:0") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," + + s"$External:PLAINTEXT, $SecureExternal:SASL_SSL") + props.put(KafkaConfig.InterBrokerListenerNameProp, Internal) + props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") + props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism) + props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) + props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka") + + props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")) + + // set listener-specific configs and set an invalid path for the global config to verify that the overrides work + Seq(SecureInternal, SecureExternal).foreach { listenerName => + props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + } + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path") + + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) + } + + val serverConfig = servers.head.config + assertEquals(4, serverConfig.listeners.size) + + TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions, + replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs) + + serverConfig.listeners.foreach { endPoint => + val listenerName = endPoint.listenerName + + TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers) + + val trustStoreFile = + if (endPoint.securityProtocol == SecurityProtocol.SASL_SSL) Some(this.trustStoreFile) + else None + + val saslProperties = setSaslProperties(listenerName) + + val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName) + + producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties) + + consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties) + } + } + + @After + override def tearDown() { + producers.values.foreach(_.close()) + consumers.values.foreach(_.close()) + servers.foreach { s => + s.shutdown() + CoreUtils.delete(s.config.logDirs) + } + super.tearDown() + } + + /** + * Tests that we can produce and consume to/from all broker-defined listeners and security protocols. We produce + * with acks=-1 to ensure that replication is also working. + */ + @Test + def testProduceConsume(): Unit = { + producers.foreach { case (listenerName, producer) => + val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, + s"value$i".getBytes)) + producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) + + val consumer = consumers(listenerName) + consumer.subscribe(Collections.singleton(listenerName.value)) + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] + TestUtils.waitUntilTrue(() => { + records ++= consumer.poll(50).asScala + records.size == producerRecords.size + }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records") + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala deleted file mode 100644 index ccc118c..0000000 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.io.File -import java.util.Collections -import java.util.concurrent.TimeUnit - -import kafka.common.Topic -import kafka.coordinator.group.OffsetConfig -import kafka.utils.{CoreUtils, TestUtils} -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.common.network.{ListenerName, Mode} -import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.Assert.assertEquals -import org.junit.{After, Before, Test} - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConverters._ - -class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness { - - private val trustStoreFile = File.createTempFile("truststore", ".jks") - private val servers = new ArrayBuffer[KafkaServer] - private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]() - private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]() - - @Before - override def setUp(): Unit = { - super.setUp() - // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest - val numServers = 2 - - (0 until numServers).foreach { brokerId => - - val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile)) - // Ensure that we can support multiple listeners per security protocol and multiple security protocols - props.put(KafkaConfig.ListenersProp, "SECURE_INTERNAL://localhost:0, INTERNAL://localhost:0, " + - "SECURE_EXTERNAL://localhost:0, EXTERNAL://localhost:0") - props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "INTERNAL:PLAINTEXT, SECURE_INTERNAL:SSL," + - "EXTERNAL:PLAINTEXT, SECURE_EXTERNAL:SSL") - props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL") - props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")) - - // set listener-specific configs and set an invalid path for the global config to verify that the overrides work - Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName => - props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) - } - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path") - - servers += TestUtils.createServer(KafkaConfig.fromProps(props)) - } - - val serverConfig = servers.head.config - assertEquals(4, serverConfig.listeners.size) - - TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions, - replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs) - - serverConfig.listeners.foreach { endPoint => - val listenerName = endPoint.listenerName - - TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers) - - val trustStoreFile = - if (endPoint.securityProtocol == SecurityProtocol.SSL) Some(this.trustStoreFile) - else None - - val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName) - - producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1, - securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile) - - consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value, - securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile) - } - } - - @After - override def tearDown() { - producers.values.foreach(_.close()) - consumers.values.foreach(_.close()) - servers.foreach { s => - s.shutdown() - CoreUtils.delete(s.config.logDirs) - } - super.tearDown() - } - - /** - * Tests that we can produce and consume to/from all broker-defined listeners and security protocols. We produce - * with acks=-1 to ensure that replication is also working. - */ - @Test - def testProduceConsume(): Unit = { - producers.foreach { case (listenerName, producer) => - val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, - s"value$i".getBytes)) - producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) - - val consumer = consumers(listenerName) - consumer.subscribe(Collections.singleton(listenerName.value)) - val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] - TestUtils.waitUntilTrue(() => { - records ++= consumer.poll(50).asScala - records.size == producerRecords.size - }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records") - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 3e7fce4..5ea92aa 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -30,7 +30,7 @@ import scala.util.{Try, Success, Failure} import javax.security.auth.login.Configuration class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { - val jaasFile = kafka.utils.JaasTestUtils.writeZkFile + val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections) val authProvider = "zookeeper.authProvider.1" @Before http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 7b90abf..3ae680c 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -82,7 +82,7 @@ object JaasTestUtils { } } - class JaasSection(contextName: String, + case class JaasSection(contextName: String, jaasModule: Seq[JaasModule]) { override def toString: String = { s"""|$contextName { @@ -122,29 +122,9 @@ object JaasTestUtils { val KafkaScramAdmin = "scram-admin" val KafkaScramAdminPassword = "scram-admin-secret" - def writeZkFile(): String = { + def writeJaasContextsToFile(jaasContexts: Seq[JaasSection]): String = { val jaasFile = TestUtils.tempFile() - writeToFile(jaasFile, zkSections) - jaasFile.getCanonicalPath - } - - def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: List[String], - kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], - clientKeyTabLocation: Option[File]): String = { - val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), - kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) - writeToFile(jaasFile, kafkaSections) - jaasFile.getCanonicalPath - } - - def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: List[String], - kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], - clientKeyTabLocation: Option[File]): String = { - val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), - kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) - writeToFile(jaasFile, kafkaSections ++ zkSections) + writeToFile(jaasFile,jaasContexts) jaasFile.getCanonicalPath } @@ -152,12 +132,12 @@ object JaasTestUtils { def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String = kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString - private def zkSections: Seq[JaasSection] = Seq( + def zkSections: Seq[JaasSection] = Seq( new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) - private def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { val modules = mechanisms.map { case "GSSAPI" => Krb5LoginModule( @@ -215,7 +195,7 @@ object JaasTestUtils { /* * Used for the static JAAS configuration and it uses the credentials for client#2 */ - private def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = { + def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = { new JaasSection(KafkaClientContextName, mechanism.map(m => kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)).toSeq) } http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 4d57ed9..c9076b5 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -47,7 +47,7 @@ object ZKEphemeralTest { @RunWith(value = classOf[Parameterized]) class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { - val jaasFile = kafka.utils.JaasTestUtils.writeZkFile() + val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections) val authProvider = "zookeeper.authProvider.1" var zkSessionTimeoutMs = 1000