kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
Date Fri, 08 Jun 2018 15:14:02 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b3989f  KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
0b3989f is described below

commit 0b3989fd72acc6e989c495c63cb8ca6aa5850811
Author: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
AuthorDate: Fri Jun 8 16:13:44 2018 +0100

    KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
    
    remove duplicate Scala ResourceNameType in preference to in preference to Java ResourceNameType.
    
    This is follow on work for KIP-290 and PR #5117, which saw the Scala ResourceNameType class introduced.
    
    I've added tests to ensure AclBindings can't be created with ResourceNameType.ANY or UNKNOWN.
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
---
 .../apache/kafka/common/acl/AclBindingTest.java    | 56 +++++++++++-----
 core/src/main/scala/kafka/admin/AclCommand.scala   | 22 +++----
 .../main/scala/kafka/security/SecurityUtils.scala  |  7 +-
 .../scala/kafka/security/auth/Authorizer.scala     | 34 ++++++----
 .../main/scala/kafka/security/auth/Resource.scala  | 16 +++--
 .../kafka/security/auth/ResourceNameType.scala     | 49 --------------
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 16 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 74 +++++++++++-----------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  3 +-
 core/src/main/scala/kafka/zk/ZkData.scala          | 12 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala      | 43 +++++++------
 .../kafka/api/EndToEndAuthorizationTest.scala      | 15 +++--
 .../scala/kafka/security/auth/ResourceTest.scala   | 17 ++---
 .../scala/unit/kafka/admin/AclCommandTest.scala    | 19 +++---
 .../ZkNodeChangeNotificationListenerTest.scala     | 13 ++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 55 ++++++++--------
 .../delegation/DelegationTokenManagerTest.scala    |  7 +-
 17 files changed, 229 insertions(+), 229 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index a35faca..4e41f98 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -56,29 +58,29 @@ public class AclBindingTest {
         new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
-    public void testMatching() throws Exception {
-        assertTrue(ACL1.equals(ACL1));
+    public void testMatching() {
+        assertEquals(ACL1, ACL1);
         final AclBinding acl1Copy = new AclBinding(
             new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
-        assertTrue(ACL1.equals(acl1Copy));
-        assertTrue(acl1Copy.equals(ACL1));
-        assertTrue(ACL2.equals(ACL2));
-        assertFalse(ACL1.equals(ACL2));
-        assertFalse(ACL2.equals(ACL1));
+        assertEquals(ACL1, acl1Copy);
+        assertEquals(acl1Copy, ACL1);
+        assertEquals(ACL2, ACL2);
+        assertNotEquals(ACL1, ACL2);
+        assertNotEquals(ACL2, ACL1);
         assertTrue(AclBindingFilter.ANY.matches(ACL1));
-        assertFalse(AclBindingFilter.ANY.equals(ACL1));
+        assertNotEquals(AclBindingFilter.ANY, ACL1);
         assertTrue(AclBindingFilter.ANY.matches(ACL2));
-        assertFalse(AclBindingFilter.ANY.equals(ACL2));
+        assertNotEquals(AclBindingFilter.ANY, ACL2);
         assertTrue(AclBindingFilter.ANY.matches(ACL3));
-        assertFalse(AclBindingFilter.ANY.equals(ACL3));
-        assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
+        assertNotEquals(AclBindingFilter.ANY, ACL3);
+        assertEquals(AclBindingFilter.ANY, AclBindingFilter.ANY);
         assertTrue(ANY_ANONYMOUS.matches(ACL1));
-        assertFalse(ANY_ANONYMOUS.equals(ACL1));
+        assertNotEquals(ANY_ANONYMOUS, ACL1);
         assertFalse(ANY_ANONYMOUS.matches(ACL2));
-        assertFalse(ANY_ANONYMOUS.equals(ACL2));
+        assertNotEquals(ANY_ANONYMOUS, ACL2);
         assertTrue(ANY_ANONYMOUS.matches(ACL3));
-        assertFalse(ANY_ANONYMOUS.equals(ACL3));
+        assertNotEquals(ANY_ANONYMOUS, ACL3);
         assertFalse(ANY_DENY.matches(ACL1));
         assertFalse(ANY_DENY.matches(ACL2));
         assertTrue(ANY_DENY.matches(ACL3));
@@ -87,12 +89,12 @@ public class AclBindingTest {
         assertFalse(ANY_MYTOPIC.matches(ACL3));
         assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
         assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
-        assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
+        assertEquals(UNKNOWN_ACL, UNKNOWN_ACL);
         assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
     }
 
     @Test
-    public void testUnknowns() throws Exception {
+    public void testUnknowns() {
         assertFalse(ACL1.isUnknown());
         assertFalse(ACL2.isUnknown());
         assertFalse(ACL3.isUnknown());
@@ -103,7 +105,7 @@ public class AclBindingTest {
     }
 
     @Test
-    public void testMatchesAtMostOne() throws Exception {
+    public void testMatchesAtMostOne() {
         assertNull(ACL1.toFilter().findIndefiniteField());
         assertNull(ACL2.toFilter().findIndefiniteField());
         assertNull(ACL3.toFilter().findIndefiniteField());
@@ -111,4 +113,24 @@ public class AclBindingTest {
         assertFalse(ANY_DENY.matchesAtMostOne());
         assertFalse(ANY_MYTOPIC.matchesAtMostOne());
     }
+
+    @Test
+    public void shouldNotThrowOnUnknownResourceNameType() {
+        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.UNKNOWN), ACL1.entry());
+    }
+
+    @Test
+    public void shouldNotThrowOnUnknownResourceType() {
+        new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), ACL1.entry());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowOnAnyResourceNameType() {
+        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.ANY), ACL1.entry());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowOnAnyResourceType() {
+        new AclBinding(new ResourcePattern(ResourceType.ANY, "foo", ResourceNameType.LITERAL), ACL1.entry());
+    }
 }
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index d55e886..d223945 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -25,13 +25,13 @@ import kafka.utils._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource}
+import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType, ResourceType => JResourceType, Resource => JResource}
 
 import scala.collection.JavaConverters._
 
 object AclCommand extends Logging {
 
-  val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL)
+  val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, ResourceNameType.LITERAL)
 
   private val Newline = scala.util.Properties.lineSeparator
 
@@ -87,13 +87,13 @@ object AclCommand extends Logging {
   }
 
   private def addAcl(opts: AclCommandOptions) {
-    if (opts.options.valueOf(opts.resourceNameType) == JResourceNameType.ANY)
+    if (opts.options.valueOf(opts.resourceNameType) == ResourceNameType.ANY)
       CommandLineUtils.printUsageAndDie(opts.parser, "A '--resource-name-type' value of 'Any' is not valid when adding acls.")
 
     withAuthorizer(opts) { authorizer =>
       val resourceToAcl = getResourceFilterToAcls(opts).map {
         case (filter, acls) =>
-          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), ResourceNameType.fromJava(filter.nameType())) -> acls
+          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.nameType()) -> acls
       }
 
       if (resourceToAcl.values.exists(_.isEmpty))
@@ -262,13 +262,13 @@ object AclCommand extends Logging {
   }
 
   private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
-    val resourceNameType: JResourceNameType = opts.options.valueOf(opts.resourceNameType)
+    val resourceNameType: ResourceNameType = opts.options.valueOf(opts.resourceNameType)
 
     var resourceFilters = Set.empty[ResourcePatternFilter]
     if (opts.options.has(opts.topicOpt))
       opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, resourceNameType))
 
-    if (resourceNameType == JResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
+    if (resourceNameType == ResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
       resourceFilters += ClusterResourceFilter
 
     if (opts.options.has(opts.groupOpt))
@@ -349,7 +349,7 @@ object AclCommand extends Logging {
       .withRequiredArg()
       .ofType(classOf[String])
       .withValuesConvertedBy(new ResourceNameTypeConverter())
-      .defaultsTo(JResourceNameType.LITERAL)
+      .defaultsTo(ResourceNameType.LITERAL)
 
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
@@ -429,9 +429,9 @@ object AclCommand extends Logging {
 
 }
 
-class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf[JResourceNameType]) {
+class ResourceNameTypeConverter extends EnumConverter[ResourceNameType](classOf[ResourceNameType]) {
 
-  override def convert(value: String): JResourceNameType = {
+  override def convert(value: String): ResourceNameType = {
     val nameType = super.convert(value)
     if (nameType.isUnknown)
       throw new ValueConversionException("Unknown resourceNameType: " + value)
@@ -439,7 +439,7 @@ class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf
     nameType
   }
 
-  override def valuePattern: String = JResourceNameType.values
-    .filter(_ != JResourceNameType.UNKNOWN)
+  override def valuePattern: String = ResourceNameType.values
+    .filter(_ != ResourceNameType.UNKNOWN)
     .mkString("|")
 }
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 7489a3e..3d0f52e 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.security
 
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
@@ -32,11 +32,10 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
     (for {
       resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
-      resourceNameType <- Try(ResourceNameType.fromJava(filter.patternFilter.nameType))
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
-      resource = Resource(resourceType, filter.patternFilter.name, resourceNameType)
+      resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.nameType)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
     } yield (resource, acl)) match {
       case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
@@ -45,7 +44,7 @@ object SecurityUtils {
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava)
+    val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType)
     val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(resourcePattern, entry)
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 4f4ddcf..6875dc6 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -48,17 +48,19 @@ trait Authorizer extends Configurable {
    *
    * {code}
    * // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
    *
    * // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
    *
    * // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
    * {code}
    *
    * @param acls set of acls to add to existing acls
-   * @param resource the resource path to which these acls should be attached
+   * @param resource the resource path to which these acls should be attached.
+   *                the supplied resource will have a specific resource name type,
+   *                i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
    */
   def addAcls(acls: Set[Acl], resource: Resource): Unit
 
@@ -67,17 +69,19 @@ trait Authorizer extends Configurable {
    *
    * {code}
    * // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
    *
    * // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
    *
    * // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
    * {code}
    *
    * @param acls set of acls to be removed.
    * @param resource resource path from which the acls should be removed.
+   *                 the supplied resource will have a specific resource name type,
+   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
    * @return true if some acl got removed, false if no acl was removed.
    */
   def removeAcls(acls: Set[Acl], resource: Resource): Boolean
@@ -87,16 +91,18 @@ trait Authorizer extends Configurable {
    *
    * {code}
    * // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+   * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
    *
    * // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Resource(Topic, "*", Literal))
+   * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
    *
    * // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+   * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
    * {code}
    *
    * @param resource the resource path from which these acls should be removed.
+   *                 the supplied resource will have a specific resource name type,
+   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
    * @return
    */
   def removeAcls(resource: Resource): Boolean
@@ -106,16 +112,18 @@ trait Authorizer extends Configurable {
    *
    * {code}
    * // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+   * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
    *
    * // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Resource(Topic, "*", Literal))
+   * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
    *
    * // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+   * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
    * {code}
    *
    * @param resource the resource path to which the acls belong.
+   *                 the supplied resource will have a specific resource name type,
+   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
    * @return empty set if no acls are found, otherwise the acls for the resource.
    */
   def getAcls(resource: Resource): Set[Acl]
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index c9b5727..f07a11c 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -16,12 +16,12 @@
  */
 package kafka.security.auth
 
-import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}
 
 object Resource {
   val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
-  val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName, Literal)
+  val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, ResourceNameType.LITERAL)
   val ProducerIdResourceName = "producer-id"
   val WildCardResource = "*"
 
@@ -34,7 +34,7 @@ object Resource {
         }
       case _ =>
         str.split(Separator, 2) match {
-          case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, Literal)
+          case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
           case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
         }
     }
@@ -50,6 +50,12 @@ object Resource {
  */
 case class Resource(resourceType: ResourceType, name: String, nameType: ResourceNameType) {
 
+  if (nameType == ResourceNameType.ANY)
+    throw new IllegalArgumentException("nameType must not be ANY")
+
+  if (nameType == ResourceNameType.UNKNOWN)
+    throw new IllegalArgumentException("nameType must not be UNKNOWN")
+
   /**
     * Create an instance of this class with the provided parameters.
     * Resource name type would default to ResourceNameType.LITERAL.
@@ -60,11 +66,11 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
     */
   @deprecated("Use Resource(ResourceType, String, ResourceNameType")
   def this(resourceType: ResourceType, name: String) {
-    this(resourceType, name, Literal)
+    this(resourceType, name, ResourceNameType.LITERAL)
   }
 
   def toPattern: ResourcePattern = {
-    new ResourcePattern(resourceType.toJava, name, nameType.toJava)
+    new ResourcePattern(resourceType.toJava, name, nameType)
   }
 
   override def toString: String = {
diff --git a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
deleted file mode 100644
index 21b10a1..0000000
--- a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.resource.{ResourceNameType => JResourceNameType}
-
-sealed trait ResourceNameType extends BaseEnum  with Ordered[ ResourceNameType ] {
-  def toJava: JResourceNameType
-
-  override def compare(that: ResourceNameType): Int = this.name compare that.name
-}
-
-case object Literal extends ResourceNameType {
-  val name = "Literal"
-  val toJava = JResourceNameType.LITERAL
-}
-
-case object Prefixed extends ResourceNameType {
-  val name = "Prefixed"
-  val toJava = JResourceNameType.PREFIXED
-}
-
-object ResourceNameType {
-
-  def fromString(resourceNameType: String): ResourceNameType = {
-    val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceNameType))
-    rType.getOrElse(throw new KafkaException(resourceNameType + " not a valid resourceNameType name. The valid names are " + values.mkString(",")))
-  }
-
-  def values: Seq[ResourceNameType] = List(Literal, Prefixed)
-
-  def fromJava(nameType: JResourceNameType): ResourceNameType = fromString(nameType.toString)
-}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 0cb2fae..601b5be 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore}
+import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
@@ -101,7 +102,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
-    if (resource.nameType != Literal) {
+    if (resource.nameType != ResourceNameType.LITERAL) {
       throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.nameType)
     }
 
@@ -203,15 +204,18 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
     inReadLock(lock) {
-      val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, Literal))
+      val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, ResourceNameType.LITERAL))
         .map(_.acls)
         .getOrElse(Set.empty[Acl])
 
-      val literal = aclCache.get(Resource(resourceType, resourceName, Literal))
+      val literal = aclCache.get(Resource(resourceType, resourceName, ResourceNameType.LITERAL))
         .map(_.acls)
         .getOrElse(Set.empty[Acl])
 
-      val prefixed = aclCache.range(Resource(resourceType, resourceName, Prefixed), Resource(resourceType, resourceName.substring(0, 1), Prefixed))
+      val prefixed = aclCache.range(
+        Resource(resourceType, resourceName, ResourceNameType.PREFIXED),
+        Resource(resourceType, resourceName.substring(0, 1), ResourceNameType.PREFIXED)
+      )
         .filterKeys(resource => resourceName.startsWith(resource.name))
         .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
         .toSet
@@ -222,7 +226,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def getAcls(): Map[Resource, Set[Acl]] = {
     inReadLock(lock) {
-      aclCache.mapValues(_.acls).toMap
+      aclCache.mapValues(_.acls)
     }
   }
 
@@ -365,7 +369,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       if (rt != 0)
         rt
       else {
-        val rnt = a.nameType compare b.nameType
+        val rnt = a.nameType compareTo b.nameType
         if (rnt != 0)
           rnt
         else
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d9e3d1..7a39c12 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, A
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -273,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
     // reject the request if not authorized to the group
-    if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId, Literal))) {
+    if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.groupId, LITERAL))) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
@@ -286,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
 
       for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+        if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
           unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -384,7 +385,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
 
     if (produceRequest.isTransactional) {
-      if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId, Literal))) {
+      if (!authorize(request.session, Write, Resource(TransactionalId, produceRequest.transactionalId, LITERAL))) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
@@ -400,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
     for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
-      if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
+      if (!authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
         unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -529,7 +530,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       // Regular Kafka consumers need READ permission on each partition they are fetching.
       fetchContext.foreachPartition((topicPartition, data) => {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+        if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
           erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
             FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
             FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@@ -741,7 +742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetRequest = request.body[ListOffsetRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
+      case (topicPartition, _) => authorize(request.session, Describe, Resource(Topic, topicPartition.topic, LITERAL))
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
@@ -794,7 +795,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetRequest = request.body[ListOffsetRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
+      case (topicPartition, _) => authorize(request.session, Describe, Resource(Topic, topicPartition.topic, LITERAL))
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
@@ -1033,7 +1034,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
     var (authorizedTopics, unauthorizedForDescribeTopics) =
-      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic, Literal)))
+      topics.partition(topic => authorize(request.session, Describe, Resource(Topic, topic, LITERAL)))
 
     var unauthorizedForCreateTopics = Set[String]()
 
@@ -1097,12 +1098,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetFetchRequest = request.body[OffsetFetchRequest]
 
     def authorizeTopicDescribe(partition: TopicPartition) =
-      authorize(request.session, Describe, new Resource(Topic, partition.topic, Literal))
+      authorize(request.session, Describe, Resource(Topic, partition.topic, LITERAL))
 
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
         // reject the request if not authorized to the group
-        if (!authorize(request.session, Describe, new Resource(Group, offsetFetchRequest.groupId, Literal)))
+        if (!authorize(request.session, Describe, Resource(Group, offsetFetchRequest.groupId, LITERAL)))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
         else {
           if (header.apiVersion == 0) {
@@ -1170,10 +1171,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
     if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
-        !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey, Literal)))
+        !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
-        !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, Literal)))
+        !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
       // get metadata (and create the topic if necessary)
@@ -1220,7 +1221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeRequest = request.body[DescribeGroupsRequest]
 
     val groups = describeRequest.groupIds.asScala.map { groupId =>
-      if (!authorize(request.session, Describe, new Resource(Group, groupId, Literal))) {
+      if (!authorize(request.session, Describe, Resource(Group, groupId, LITERAL))) {
         groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
       } else {
         val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1266,7 +1267,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId(), Literal))) {
+    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new JoinGroupResponse(
           requestThrottleMs,
@@ -1302,7 +1303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId(), Literal))) {
+    if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.groupId(), LITERAL))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
       groupCoordinator.handleSyncGroup(
@@ -1320,7 +1321,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     var groups = deleteGroupsRequest.groups.asScala.toSet
 
     val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
-      authorize(request.session, Delete, new Resource(Group, group, Literal))
+      authorize(request.session, Delete, Resource(Group, group, LITERAL))
     }
 
     val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
@@ -1344,7 +1345,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId, Literal))) {
+    if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.groupId, LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
@@ -1371,7 +1372,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId, Literal))) {
+    if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.groupId, LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
@@ -1491,7 +1492,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val dupes = createPartitionsRequest.duplicates.asScala
       val notDuped = createPartitionsRequest.newPartitions.asScala -- dupes
       val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
-        authorize(request.session, Alter, new Resource(Topic, topic, Literal))
+        authorize(request.session, Alter, Resource(Topic, topic, LITERAL))
       }
 
       val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
@@ -1515,7 +1516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedForDeleteTopics =  mutable.Set[String]()
 
     for (topic <- deleteTopicRequest.topics.asScala) {
-      if (!authorize(request.session, Delete, new Resource(Topic, topic, Literal)))
+      if (!authorize(request.session, Delete, Resource(Topic, topic, LITERAL)))
         unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
       else if (!metadataCache.contains(topic))
         nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1560,7 +1561,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
 
     for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
-      if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic, Literal)))
+      if (!authorize(request.session, Delete, Resource(Topic, topicPartition.topic, LITERAL)))
         unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
           DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition))
@@ -1603,7 +1604,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val transactionalId = initProducerIdRequest.transactionalId
 
     if (transactionalId != null) {
-      if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
+      if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
         sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
@@ -1628,7 +1629,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.transactionalId
 
-    if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
+    if (authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
       def sendResponseCallback(error: Errors) {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val responseBody = new EndTxnResponse(requestThrottleMs, error)
@@ -1763,7 +1764,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
+    if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else {
@@ -1773,7 +1774,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       for (topicPartition <- partitionsToAdd) {
         if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
-            !authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
+            !authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
           unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1817,10 +1818,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = addOffsetsToTxnRequest.consumerGroupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
-    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
+    if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
-    else if (!authorize(request.session, Read, new Resource(Group, groupId, Literal)))
+    else if (!authorize(request.session, Read, Resource(Group, groupId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     else {
@@ -1849,9 +1850,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
     // since it is implied by transactionalId authorization
-    if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId, Literal)))
+    if (!authorize(request.session, Write, Resource(TransactionalId, txnOffsetCommitRequest.transactionalId, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
-    else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId, Literal)))
+    else if (!authorize(request.session, Read, Resource(Group, txnOffsetCommitRequest.consumerGroupId, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
       val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
@@ -1859,7 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
 
       for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
-        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+        if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
           unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1920,10 +1921,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         val filter = describeAclsRequest.filter()
         val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
           acls.flatMap { acl =>
-            val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava),
+            val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
                 new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
-            if (filter.matches(fixture)) Some(fixture)
-            else None
+            Some(fixture).filter(filter.matches)
           }
         }
         sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava),
+              new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
               new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
@@ -2042,7 +2042,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RResourceType.BROKER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, AlterConfigs, new Resource(Topic, resource.name, Literal))
+          authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2069,7 +2069,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       resource.`type` match {
         case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name, Literal))
+          authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
       }
     }
@@ -2216,7 +2216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       else {
         val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList)
-        def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
+        def authorizeToken(tokenId: String) = authorize(request.session, Describe, Resource(kafka.security.auth.DelegationToken, tokenId, LITERAL))
         def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
         val tokens =  tokenManager.getTokens(eligible)
         sendResponseCallback(Errors.NONE, tokens)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 20e4b83..6ec8e30 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -26,11 +26,12 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 6121035..d4470ab 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -26,11 +26,12 @@ import kafka.cluster.{Broker, EndPoint}
 import kafka.common.KafkaException
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Literal, Prefixed, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.Time
@@ -458,14 +459,14 @@ object StateChangeHandlers {
   */
 case class ZkAclStore(nameType: ResourceNameType) {
   val aclPath: String = nameType match {
-    case Literal => "/kafka-acl"
-    case Prefixed => "/kafka-prefixed-acl"
+    case ResourceNameType.LITERAL => "/kafka-acl"
+    case ResourceNameType.PREFIXED => "/kafka-prefixed-acl"
     case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
   }
 
   val aclChangePath: String = nameType match {
-    case Literal => "/kafka-acl-changes"
-    case Prefixed => "/kafka-prefixed-acl-changes"
+    case ResourceNameType.LITERAL => "/kafka-acl-changes"
+    case ResourceNameType.PREFIXED => "/kafka-prefixed-acl-changes"
     case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
   }
 
@@ -480,6 +481,7 @@ case class ZkAclStore(nameType: ResourceNameType) {
 
 object ZkAclStore {
   val stores: Seq[ZkAclStore] = ResourceNameType.values
+    .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN)
     .map(nameType => ZkAclStore(nameType))
 
   val securePaths: Seq[String] = stores
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a3b3233..b48a349 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,9 +40,10 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records,
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition, acl, requests, resource}
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 
@@ -70,11 +71,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteRecordsPartition = new TopicPartition(deleteTopic, part)
   val topicAndPartition = TopicAndPartition(topic, part)
   val group = "my-group"
-  val topicResource = new Resource(Topic, topic, Literal)
-  val groupResource = new Resource(Group, group, Literal)
-  val deleteTopicResource = new Resource(Topic, deleteTopic, Literal)
-  val transactionalIdResource = new Resource(TransactionalId, transactionalId, Literal)
-  val createTopicResource = new Resource(Topic, createTopic, Literal)
+  val topicResource = Resource(Topic, topic, LITERAL)
+  val groupResource = Resource(Group, group, LITERAL)
+  val deleteTopicResource = Resource(Topic, deleteTopic, LITERAL)
+  val transactionalIdResource = Resource(TransactionalId, transactionalId, LITERAL)
+  val createTopicResource = Resource(Topic, createTopic, LITERAL)
 
   val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
   val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@@ -378,12 +379,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createAclsRequest = new CreateAclsRequest.Builder(
     Collections.singletonList(new AclCreation(new AclBinding(
-      new ResourcePattern(AdminResourceType.TOPIC, "mytopic", resource.ResourceNameType.LITERAL),
+      new ResourcePattern(AdminResourceType.TOPIC, "mytopic", LITERAL),
       new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
 
   private def deleteAclsRequest = new DeleteAclsRequest.Builder(
     Collections.singletonList(new AclBindingFilter(
-      new ResourcePatternFilter(AdminResourceType.TOPIC, null, resource.ResourceNameType.LITERAL),
+      new ResourcePatternFilter(AdminResourceType.TOPIC, null, LITERAL),
       new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
   private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
@@ -577,13 +578,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
     val topicPartition = new TopicPartition(createTopic, 0)
-    val newTopicResource = new Resource(Topic, createTopic, Literal)
+    val newTopicResource = Resource(Topic, createTopic, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
     try {
       sendRecords(numRecords, topicPartition)
       Assert.fail("should have thrown exception")
     } catch {
-      case e: TopicAuthorizationException => 
+      case e: TopicAuthorizationException =>
         assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics())
     }
 
@@ -733,7 +734,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
     createTopic(unmatchedTopic)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic, Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  Resource(Topic, unmatchedTopic, LITERAL))
     sendRecords(1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
 
@@ -745,8 +746,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     // set the subscription pattern to an internal topic that the consumer has read permission to. Since
     // internal topics are not included, we should not be assigned any partitions from this topic
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
-      GROUP_METADATA_TOPIC_NAME, Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)),  Resource(Topic,
+      GROUP_METADATA_TOPIC_NAME, LITERAL))
     consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     consumer.poll(0)
     assertTrue(consumer.subscription().isEmpty)
@@ -774,7 +775,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
       // now authorize the user for the internal topic and verify that we can subscribe
       addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
-        GROUP_METADATA_TOPIC_NAME, Literal))
+        GROUP_METADATA_TOPIC_NAME, LITERAL))
       consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
       consumer.poll(0)
       assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
@@ -789,7 +790,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME, Literal)
+    val internalTopicResource = Resource(Topic, GROUP_METADATA_TOPIC_NAME, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
 
     val consumerConfig = new Properties
@@ -836,13 +837,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testCreatePermissionOnClusterToReadFromNonExistentTopic() {
     testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
-      Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), 
+      Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
       Cluster)
   }
 
   private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
     val topicPartition = new TopicPartition(newTopic, 0)
-    val newTopicResource = new Resource(Topic, newTopic, Literal)
+    val newTopicResource = Resource(Topic, newTopic, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
     this.consumers.head.assign(List(topicPartition).asJava)
@@ -1045,7 +1046,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteTopicsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), Resource(Topic, "*", LITERAL))
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1072,7 +1073,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteRecordsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), Resource(Topic, "*", LITERAL))
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1090,7 +1091,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testCreatePartitionsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*", Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), Resource(Topic, "*", LITERAL))
     val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
     val version = ApiKeys.CREATE_PARTITIONS.latestVersion
     val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1283,7 +1284,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic, Literal))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource(Topic, deleteTopic, LITERAL))
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index b809686..1f89ea3 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException}
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -78,13 +79,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
 
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
-  val topicResource = new Resource(Topic, topic, Literal)
-  val groupResource = new Resource(Group, group, Literal)
+  val topicResource = Resource(Topic, topic, LITERAL)
+  val groupResource = Resource(Group, group, LITERAL)
   val clusterResource = Resource.ClusterResource
-  val prefixedTopicResource = new Resource(Topic, topicPrefix, Prefixed)
-  val prefixedGroupResource = new Resource(Group, groupPrefix, Prefixed)
-  val wildcardTopicResource = new Resource(Topic, wildcard, Literal)
-  val wildcardGroupResource = new Resource(Group, wildcard, Literal)
+  val prefixedTopicResource = Resource(Topic, topicPrefix, PREFIXED)
+  val prefixedGroupResource = Resource(Group, groupPrefix, PREFIXED)
+  val wildcardTopicResource = Resource(Topic, wildcard, LITERAL)
+  val wildcardGroupResource = Resource(Group, wildcard, LITERAL)
 
   // Arguments to AclCommand to set ACLs.
   def clusterActionArgs: Array[String] = Array("--authorizer-properties",
@@ -182,7 +183,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     super.setUp()
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*", Literal))
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL))
     }
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, 3)
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
index 66049b4..2924cff 100644
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
@@ -18,6 +18,7 @@
 package kafka.security.auth
 
 import kafka.common.KafkaException
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
 import org.junit.Test
 import org.junit.Assert._
 
@@ -29,30 +30,30 @@ class ResourceTest {
 
   @Test
   def shouldParseOldTwoPartString(): Unit = {
-    assertEquals(Resource(Group, "fred", Literal), Resource.fromString("Group:fred"))
-    assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Topic:t"))
+    assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
+    assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
   }
 
   @Test
   def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Group::This:is:a:weird:group:name:"))
+    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
   }
 
   @Test
   def shouldParseThreePartString(): Unit = {
-    assertEquals(Resource(Group, "fred", Prefixed), Resource.fromString("Prefixed:Group:fred"))
-    assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Literal:Topic:t"))
+    assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("PREFIXED:Group:fred"))
+    assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("LITERAL:Topic:t"))
   }
 
   @Test
   def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Prefixed), Resource.fromString("Prefixed:Group::This:is:a:weird:group:name:"))
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Literal:Group::This:is:a:weird:group:name:"))
+    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:"))
+    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:"))
   }
 
   @Test
   def shouldRoundTripViaString(): Unit = {
-    val expected = Resource(Group, "fred", Prefixed)
+    val expected = Resource(Group, "fred", PREFIXED)
 
     val actual = Resource.fromString(expected.toString)
 
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 71754ba..76cf787 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -23,8 +23,9 @@ import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{After, Before, Test}
+import org.junit.{Before, Test}
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
@@ -36,10 +37,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
 
-  private val TopicResources = Set(Resource(Topic, "test-1", Literal), Resource(Topic, "test-2", Literal))
-  private val GroupResources = Set(Resource(Group, "testGroup-1", Literal), Resource(Group, "testGroup-2", Literal))
-  private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", Literal), Resource(TransactionalId, "t1", Literal))
-  private val TokenResources = Set(Resource(DelegationToken, "token1", Literal), Resource(DelegationToken, "token2", Literal))
+  private val TopicResources = Set(Resource(Topic, "test-1", LITERAL), Resource(Topic, "test-2", LITERAL))
+  private val GroupResources = Set(Resource(Group, "testGroup-1", LITERAL), Resource(Group, "testGroup-2", LITERAL))
+  private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", LITERAL), Resource(TransactionalId, "t1", LITERAL))
+  private val TokenResources = Set(Resource(DelegationToken, "token1", LITERAL), Resource(DelegationToken, "token2", LITERAL))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
@@ -64,7 +65,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
     TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts),
     TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
-    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, 
+    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow,
       Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts)
   )
 
@@ -140,14 +141,14 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
       val describeAcl = Acl(principal, Allow, Acl.WildCardHost, Describe)
       val createAcl = Acl(principal, Allow, Acl.WildCardHost, Create)
-      TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", Prefixed))
+      TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", PREFIXED))
     }
 
     AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
 
     withAuthorizer() { authorizer =>
-      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", Literal))
-      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", Prefixed))
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", LITERAL))
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", PREFIXED))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index cee0bd6..02918d6 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,9 +16,10 @@
  */
 package kafka.common
 
-import kafka.security.auth.{Group, Literal, Resource}
+import kafka.security.auth.{Group, Resource}
 import kafka.utils.TestUtils
 import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.junit.{After, Test}
 
 class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
@@ -38,17 +39,17 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
     @volatile var invocationCount = 0
     val notificationHandler = new NotificationHandler {
       override def processNotification(notificationMessage: Array[Byte]): Unit = {
-        notification = AclChangeNotificationSequenceZNode.decode(Literal, notificationMessage)
+        notification = AclChangeNotificationSequenceZNode.decode(LITERAL, notificationMessage)
         invocationCount += 1
       }
     }
 
     zkClient.createAclPaths()
-    val notificationMessage1 = Resource(Group, "messageA", Literal)
-    val notificationMessage2 = Resource(Group, "messageB", Literal)
+    val notificationMessage1 = Resource(Group, "messageA", LITERAL)
+    val notificationMessage2 = Resource(Group, "messageB", LITERAL)
     val changeExpirationMs = 1000
 
-    notificationListener = new ZkNodeChangeNotificationListener(zkClient,  ZkAclStore(Literal).aclChangePath,
+    notificationListener = new ZkNodeChangeNotificationListener(zkClient,  ZkAclStore(LITERAL).aclChangePath,
       AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
@@ -68,7 +69,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
       "Failed to send/process notification message in the timeout period.")
 
-    (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, Literal)))
+    (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL)))
 
     TestUtils.waitUntilTrue(() => invocationCount == 10 ,
       s"Expected 10 invocations of processNotifications, but there were $invocationCount")
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 3e7f6a8..05a433c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -24,6 +24,7 @@ import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -34,8 +35,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
   val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
 
-  val wildCardResource = Resource(Topic, WildCardResource, Literal)
-  val prefixedResource = Resource(Topic, "foo", Prefixed)
+  val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
+  val prefixedResource = Resource(Topic, "foo", PREFIXED)
 
   val simpleAclAuthorizer = new SimpleAclAuthorizer
   val simpleAclAuthorizer2 = new SimpleAclAuthorizer
@@ -62,7 +63,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     config = KafkaConfig.fromProps(props)
     simpleAclAuthorizer.configure(config.originals)
     simpleAclAuthorizer2.configure(config.originals)
-    resource = new Resource(Topic, "foo-" + UUID.randomUUID(), Literal)
+    resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
   }
 
   @After
@@ -74,7 +75,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test(expected = classOf[IllegalArgumentException])
   def testAuthorizeThrowsOnNoneLiteralResource() {
-    simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", Prefixed))
+    simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED))
   }
 
   @Test
@@ -234,10 +235,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
 
     val resourceToAcls = Map[Resource, Set[Acl]](
-      new Resource(Topic, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
-      new Resource(Cluster, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
-      new Resource(Group, Resource.WildCardResource, Literal) -> acls,
-      new Resource(Group, "test-ConsumerGroup", Literal) -> acls
+      new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
+      new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
+      new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
+      new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
     )
 
     resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
@@ -265,7 +266,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     simpleAclAuthorizer.addAcls(acls, resource)
 
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val resource1 = new Resource(Topic, "test-2", Literal)
+    val resource1 = Resource(Topic, "test-2", LITERAL)
     val acl2 = new Acl(user2, Deny, "host3", Read)
     val acls1 = Set[Acl](acl2)
     simpleAclAuthorizer.addAcls(acls1, resource1)
@@ -284,7 +285,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test", Literal)
+    val commonResource = Resource(Topic, "test", LITERAL)
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -300,7 +301,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testDistributedConcurrentModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test", Literal)
+    val commonResource = Resource(Topic, "test", LITERAL)
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -330,7 +331,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testHighConcurrencyModificationOfResourceAcls() {
-    val commonResource = new Resource(Topic, "test", Literal)
+    val commonResource = Resource(Topic, "test", LITERAL)
 
     val acls = (0 to 50).map { i =>
       val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
@@ -513,18 +514,18 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testAuthorizeWithPrefixedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Literal))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Prefixed))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Literal))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
 
     simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
 
@@ -539,16 +540,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
     assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
 
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, Acl.WildCardResource, Literal))
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Topic, Acl.WildCardResource, LITERAL))
     assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
 
     val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, "groupA", Literal))
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, "groupA", LITERAL))
     assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
 
     // add prefixed principal acl on wildcard group name
     val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.charAt(0) + WildCardResource), Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, Acl.WildCardResource, Literal))
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, Acl.WildCardResource, LITERAL))
     assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
   }
 
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index eec7175..7df30c9 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -28,6 +28,7 @@ import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaC
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -242,7 +243,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
 
     //get all tokens for multiple owners (owner1, renewer4) and with permission
     var acl = new Acl(owner1, Allow, WildCardHost, Describe)
-    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3, Literal))
+    simpleAclAuthorizer.addAcls(Set(acl), Resource(kafka.security.auth.DelegationToken, tokenId3, LITERAL))
     tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
     assert(tokens.size == 3)
 
@@ -257,7 +258,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
     //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
     hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
     acl = new Acl(renewer2, Allow, WildCardHost, Describe)
-    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2, Literal))
+    simpleAclAuthorizer.addAcls(Set(acl), Resource(kafka.security.auth.DelegationToken, tokenId2, LITERAL))
     tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession,  renewer2, List(renewer2, renewer3))
     assert(tokens.size == 2)
 
@@ -271,7 +272,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
       List()
     }
     else {
-      def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
+      def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, Resource(kafka.security.auth.DelegationToken, tokenId, LITERAL))
       def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
       tokenManager.getTokens(eligible)
     }

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

Mime
View raw message