kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
Date Sat, 08 Sep 2018 00:41:09 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f348f10  KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
f348f10 is described below

commit f348f10ef87925081fdf9455ace6d2a86179b483
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Sat Sep 8 06:10:59 2018 +0530

    KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/admin/AclCommand.scala   | 280 ++++++++++++++++-----
 .../main/scala/kafka/security/SecurityUtils.scala  |   5 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala    |  89 +++++--
 docs/security.html                                 |  24 +-
 4 files changed, 307 insertions(+), 91 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 31e6c53..c2dda33 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -17,17 +17,22 @@
 
 package kafka.admin
 
+import java.util.Properties
+
 import joptsimple._
 import joptsimple.util.EnumConverter
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, Resource =>
JResource, ResourceType => JResourceType}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter,
Resource => JResource, ResourceType => JResourceType}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 object AclCommand extends Logging {
 
@@ -52,13 +57,21 @@ object AclCommand extends Logging {
 
     opts.checkArgs()
 
+    val aclCommandService = {
+      if (opts.options.has(opts.bootstrapServerOpt)) {
+        new AdminClientService(opts)
+      } else {
+        new AuthorizerService(opts)
+      }
+    }
+
     try {
       if (opts.options.has(opts.addOpt))
-        addAcl(opts)
+        aclCommandService.addAcls()
       else if (opts.options.has(opts.removeOpt))
-        removeAcl(opts)
+        aclCommandService.removeAcls()
       else if (opts.options.has(opts.listOpt))
-        listAcl(opts)
+        aclCommandService.listAcls()
     } catch {
       case e: Throwable =>
         println(s"Error while executing ACL command: ${e.getMessage}")
@@ -67,91 +80,202 @@ object AclCommand extends Logging {
     }
   }
 
-  def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
-    val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
-    val authorizerProperties =
-      if (opts.options.has(opts.authorizerPropertiesOpt)) {
-        val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
-        defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue
= false).asScala
-      } else {
-        defaultProps
+  sealed trait AclCommandService {
+    def addAcls(): Unit
+    def removeAcls(): Unit
+    def listAcls(): Unit
+  }
+
+  class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging
{
+
+    private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => Unit) {
+      val props = if (opts.options.has(opts.commandConfigOpt))
+        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+      else
+        new Properties()
+      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+      val adminClient = JAdminClient.create(props)
+
+      try {
+        f(adminClient)
+      } finally {
+        adminClient.close()
       }
+    }
 
-    val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
-    val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
-    try {
-      authZ.configure(authorizerProperties.asJava)
-      f(authZ)
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAdminClient(opts) { adminClient =>
+        for ((resource, acls) <- resourceToAcl) {
+          val resourcePattern = resource.toPattern
+          println(s"Adding ACLs for resource `$resourcePattern`: $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline")
+          val aclBindings = acls.map(acl => new AclBinding(resourcePattern, getAccessControlEntry(acl))).asJavaCollection
+          adminClient.createAcls(aclBindings).all().get()
+        }
+
+        listAcls()
+      }
     }
-    finally CoreUtils.swallow(authZ.close(), this)
-  }
 
-  private def addAcl(opts: AclCommandOptions) {
-    val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
-    if (!patternType.isSpecific)
-      CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value
of '$patternType' is not valid when adding acls.")
+    def removeAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource
filter `$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          }
+        }
+
+        listAcls()
+      }
+    }
 
-    withAuthorizer(opts) { authorizer =>
-      val resourceToAcl = getResourceFilterToAcls(opts).map {
-        case (filter, acls) =>
-          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType())
-> acls
+    def listAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        val resourceToAcls = getAcls(adminClient, filters)
+
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")
       }
+    }
 
-      if (resourceToAcl.values.exists(_.isEmpty))
-        CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal,
--deny-principal when trying to add ACLs.")
+    private def getAccessControlEntry(acl: Acl): AccessControlEntry = {
+      new AccessControlEntry(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava)
+    }
 
-      for ((resource, acls) <- resourceToAcl) {
-        println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")
-        authorizer.addAcls(acls, resource)
+    private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: ResourcePatternFilter):
Unit = {
+      if (acls.isEmpty)
+        adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
+      else {
+        val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, getAccessControlEntryFilter(acl))).toList.asJava
+        adminClient.deleteAcls(aclBindingFilters).all().get()
       }
+    }
+
+    private def getAccessControlEntryFilter(acl: Acl): AccessControlEntryFilter = {
+      new AccessControlEntryFilter(acl.principal.toString, acl.host, acl.operation.toJava,
acl.permissionType.toJava)
+    }
 
-      listAcl(opts)
+    private def getAcls(adminClient: JAdminClient, filters: Set[ResourcePatternFilter]):
Map[ResourcePattern, Set[AccessControlEntry]] = {
+      val aclBindings =
+        if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
+        else {
+          val results = for (filter <- filters) yield {
+            adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get().asScala.toList
+          }
+          results.reduceLeft(_ ++ _)
+        }
+
+      val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set())
+
+      aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern())
+ aclBinding.entry())
+      resourceToAcls.toMap
     }
   }
 
-  private def removeAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filterToAcl = getResourceFilterToAcls(opts)
+  class AuthorizerService(val opts: AclCommandOptions) extends AclCommandService with Logging
{
 
-      for ((filter, acls) <- filterToAcl) {
-        if (acls.isEmpty) {
-          if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource
filter `$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+    private def withAuthorizer()(f: Authorizer => Unit) {
+      val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
+      val authorizerProperties =
+        if (opts.options.has(opts.authorizerPropertiesOpt)) {
+          val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
+          defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue
= false).asScala
         } else {
-          if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+          defaultProps
         }
+
+      val authorizerClass = if (opts.options.has(opts.authorizerOpt))
+        opts.options.valueOf(opts.authorizerOpt)
+      else
+        classOf[SimpleAclAuthorizer].getName
+
+      val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
+      try {
+        authZ.configure(authorizerProperties.asJava)
+        f(authZ)
       }
+      finally CoreUtils.swallow(authZ.close(), this)
+    }
+
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAuthorizer() { authorizer =>
+        for ((resource, acls) <- resourceToAcl) {
+          println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")
+          authorizer.addAcls(acls, resource)
+        }
 
-      listAcl(opts)
+        listAcls()
+      }
     }
-  }
 
-  private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter)
{
-    getAcls(authorizer, filter)
-      .keys
-      .foreach(resource =>
-        if (acls.isEmpty) authorizer.removeAcls(resource)
-        else authorizer.removeAcls(acls, resource)
-      )
-  }
+    def removeAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource
filter `$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          }
+        }
 
-  private def listAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        listAcls()
+      }
+    }
+
+    def listAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+
+        val resourceToAcls: Iterable[(Resource, Set[Acl])] =
+          if (filters.isEmpty) authorizer.getAcls()
+          else filters.flatMap(filter => getAcls(authorizer, filter))
 
-      val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-        if (filters.isEmpty) authorizer.getAcls()
-        else filters.flatMap(filter => getAcls(authorizer, filter))
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")
+      }
+    }
 
-      for ((resource, acls) <- resourceToAcls)
-        println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")
+    private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter)
{
+      getAcls(authorizer, filter)
+        .keys
+        .foreach(resource =>
+          if (acls.isEmpty) authorizer.removeAcls(resource)
+          else authorizer.removeAcls(acls, resource)
+        )
     }
+
+    private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource,
Set[Acl]] =
+      authorizer.getAcls()
+        .filter { case (resource, acl) => filter.matches(resource.toPattern) }
   }
 
-  private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource,
Set[Acl]] =
-    authorizer.getAcls()
-      .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+  private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
+    val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
+    if (!patternType.isSpecific)
+      CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value
of '$patternType' is not valid when adding acls.")
+
+    val resourceToAcl = getResourceFilterToAcls(opts).map {
+      case (filter, acls) =>
+        Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType())
-> acls
+    }
+
+    if (resourceToAcl.values.exists(_.isEmpty))
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal,
--deny-principal when trying to add ACLs.")
+
+    resourceToAcl
+  }
 
   private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter,
Set[Acl]] = {
     var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
@@ -257,7 +381,7 @@ object AclCommand extends Logging {
 
   private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]):
Set[KafkaPrincipal] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => KafkaPrincipal.fromString(s.trim)).toSet
+      opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     else
       Set.empty[KafkaPrincipal]
   }
@@ -305,11 +429,23 @@ object AclCommand extends Logging {
 
   class AclCommandOptions(args: Array[String]) {
     val parser = new OptionParser(false)
+    val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs
to use for establishing the connection to the Kafka cluster." +
+      " This list should be in the form host1:port1,host2:port2,... This config is required
for acl management using admin client API.")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+      .withOptionalArg()
+      .describedAs("command-config")
+      .ofType(classOf[String])
+
     val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer,
defaults to kafka.security.auth.SimpleAclAuthorizer.")
       .withRequiredArg
       .describedAs("authorizer")
       .ofType(classOf[String])
-      .defaultsTo(classOf[SimpleAclAuthorizer].getName)
 
     val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties
required to configure an instance of Authorizer. " +
       "These are key=val pairs. For the default authorizer the example values are: zookeeper.connect=localhost:2181")
@@ -410,7 +546,17 @@ object AclCommand extends Logging {
     val options = parser.parse(args: _*)
 
     def checkArgs() {
-      CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
+      if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer
must be specified")
+
+      if (!options.has(bootstrapServerOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
+
+      if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --command-config option can only be
used with --bootstrap-server option")
+
+      if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties option can
only be used with --authorizer option")
 
       val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
       if (actions != 1)
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 5d42871..311e195 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFi
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
+import org.apache.kafka.common.utils.SecurityUtils._
 import scala.util.{Failure, Success, Try}
 
 
@@ -32,7 +31,7 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)]
= {
     (for {
       resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
-      principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+      principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
       resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 05f6189..d5535a5 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -20,20 +20,24 @@ import java.util.Properties
 
 import kafka.admin.AclCommand.AclCommandOptions
 import kafka.security.auth._
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.network.ListenerName
+
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{Before, Test}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.{After, Before, Test}
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
-  private val principal: KafkaPrincipal = KafkaPrincipal.fromString("User:test2")
-  private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
-    principal,
-    KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \<
\> \; ')"""))
+  var servers: Seq[KafkaServer] = Seq()
+
+  private val principal: KafkaPrincipal = SecurityUtils.parseKafkaPrincipal("User:test2")
+  private val Users = Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+    principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with special chars in
CN : (\, \+ \" \\ \< \> \; ')"""))
   private val Hosts = Set("host1", "host2")
   private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
@@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
   private var brokerProps: Properties = _
   private var zkArgs: Array[String] = _
+  private var adminArgs: Array[String] = _
 
   @Before
   override def setUp(): Unit = {
@@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
     brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
     brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+    brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS")
 
     zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
   }
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
-  def testAclCli() {
+  def testAclCliWithAuthorizer(): Unit = {
+    testAclCli(zkArgs)
+  }
+
+  @Test
+  def testAclCliWithAdminAPI(): Unit = {
+    createServer()
+    testAclCli(adminArgs)
+  }
+
+  private def createServer(): Unit = {
+    servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName))
+  }
+
+  private def testAclCli(cmdArgs: Array[String]) {
     for ((resources, resourceCmd) <- ResourceToCommand) {
       for (permissionType <- PermissionType.values) {
         val operationToCmd = ResourceToOperations(resources)
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-          AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
+          AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
           for (resource <- resources) {
             withAuthorizer() { authorizer =>
               TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
             }
           }
 
-          testRemove(resources, resourceCmd, brokerProps)
+          testRemove(cmdArgs, resources, resourceCmd)
       }
     }
   }
 
   @Test
-  def testProducerConsumerCli() {
+  def testProducerConsumerCliWithAuthorizer(): Unit = {
+    testProducerConsumerCli(zkArgs)
+  }
+
+  @Test
+  def testProducerConsumerCliWithAdminAPI(): Unit = {
+    createServer()
+    testProducerConsumerCli(adminArgs)
+  }
+
+  private def testProducerConsumerCli(cmdArgs: Array[String]) {
     for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
       val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_
++ _)
-      AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
+      AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
           withAuthorizer() { authorizer =>
@@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
           }
         }
       }
-      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, brokerProps)
+      testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd)
     }
   }
 
   @Test
-  def testAclsOnPrefixedResources(): Unit = {
+  def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
+    testAclsOnPrefixedResources(zkArgs)
+  }
+
+  @Test
+  def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
+    createServer()
+    testAclsOnPrefixedResources(adminArgs)
+  }
+
+  private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
     val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-",
"--resource-pattern-type", "Prefixed")
 
-    AclCommand.main(zkArgs ++ cmd :+ "--add")
+    AclCommand.main(cmdArgs ++ cmd :+ "--add")
 
     withAuthorizer() { authorizer =>
       val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
@@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic,
"Test-", PREFIXED))
     }
 
-    AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+    AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
 
     withAuthorizer() { authorizer =>
       TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster",
LITERAL))
@@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   @Test(expected = classOf[IllegalArgumentException])
   def testInvalidAuthorizerProperty() {
     val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect)
-    AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
+    val aclCommandService = new AclCommand.AuthorizerService(new AclCommandOptions(args))
+    aclCommandService.listAcls()
   }
 
   @Test
@@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     }
   }
 
-  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], brokerProps:
Properties) {
+  private def testRemove(cmdArgs: Array[String], resources: Set[Resource], resourceCmd: Array[String])
{
     for (resource <- resources) {
-      AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+      AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
       withAuthorizer() { authorizer =>
         TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
       }
@@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
   }
 
-  def withAuthorizer()(f: Authorizer => Unit) {
+  private def withAuthorizer()(f: Authorizer => Unit) {
     val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
     val authZ = new SimpleAclAuthorizer
     try {
diff --git a/docs/security.html b/docs/security.html
index d7859e0..e856a7e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1076,6 +1076,18 @@
             <td>Configuration</td>
         </tr>
         <tr>
+            <td>--bootstrap-server</td>
+            <td>A list of host/port pairs to use for establishing the connection to
the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
+            <td>--command-config</td>
+            <td>A property file containing configs to be passed to Admin Client. This
option can only be used with --bootstrap-server option.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
             <td>--cluster</td>
             <td>Indicates to the script that the user is trying to interact with acls
on the singular cluster resource.</td>
             <td></td>
@@ -1199,7 +1211,17 @@
             <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
--add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
                 Note that for consumer option we must also specify the consumer group.
                 In order to remove a principal from producer or consumer role we just need
to pass --remove option. </li>
-        </ul>
+
+        <li><b>AdminClient API based acl management</b><br>
+            Users having Alter permission on ClusterResource can use AdminClient API for
ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting
with zookeeper/authorizer directly.
+            All the above examples can be executed by using <b>--bootstrap-server</b>
option. For example:
+
+            <pre class="brush: bash;">
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf
--add --allow-principal User:Bob --producer --topic Test-topic
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf
--add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf
--list --topic Test-topic</pre></li>
+
+    </ul>
 
     <h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5
Incorporating Security Features in a Running Cluster</a></h3>
         You can secure a running cluster via one or more of the supported protocols discussed
previously. This is done in phases:


Mime
View raw message