kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5292; Fix authorization checks in AdminClient
Date Thu, 08 Jun 2017 01:03:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 79db393ff -> 2fc91afba


KAFKA-5292; Fix authorization checks in AdminClient

* NetworkClient.java: when trace logging is enabled, show AbstractResponse Struct objects, rather than just a memory address of the AbstractResponse.
* AclOperation.java: add documentation of what ACLs imply other ACLs.
* Resource.java: add CLUSTER, CLUSTER_NAME constants.
* Reconcile the Java and Scala classes for ResourceType, OperationType, etc.  Add unit tests to ensure they can be converted to each other.
* AclCommand.scala: we should be able to apply ACLs containing Alter and Describe operations to Cluster resources.
* SimpleAclAuthorizer: update the authorizer to handle the ACL inheritance rules described in AclOperation.java.
* KafkaApis.scala: update createAcls and deleteAcls to use ALTER on CLUSTER, as described in the KIP.  describeAcls should use DESCRIBE on CLUSTER.  Use      fromJava methods instead of fromString methods to convert from Java objects to Scala ones.
* SaslSslAdminClientIntegrationTest.scala: do not use AllowEveryoneIfNoAclIsFound.  Add a configureSecurityBeforeServerStart hook which installs the ACLs     necessary for the tests.  Add a test of ACL authorization ALLOW and DENY functionality.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3240 from cmccabe/KAFKA-5292


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fc91afb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fc91afb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fc91afb

Branch: refs/heads/trunk
Commit: 2fc91afbad2229ef2f8ef6fbeebc257ab7019746
Parents: 79db393
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Thu Jun 8 00:26:14 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jun 8 01:56:08 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  23 +-
 .../apache/kafka/common/acl/AclOperation.java   |  13 ++
 .../kafka/common/requests/AbstractResponse.java |   3 +
 .../apache/kafka/common/resource/Resource.java  |  10 +
 .../kafka/common/resource/ResourceType.java     |   4 +-
 .../kafka/common/acl/AclOperationTest.java      |   1 +
 .../kafka/common/acl/AclPermissionTypeTest.java |   1 +
 .../kafka/common/resource/ResourceTypeTest.java |   3 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |   2 +-
 .../controller/ControllerChannelManager.scala   |   5 +-
 .../scala/kafka/security/auth/Operation.scala   |   8 +-
 .../kafka/security/auth/PermissionType.scala    |   8 +-
 .../kafka/security/auth/ResourceType.scala      |  22 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  19 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  54 +++--
 .../api/SaslSslAdminClientIntegrationTest.scala | 225 +++++++++++++++----
 .../kafka/security/auth/OperationTest.scala     |  28 +--
 .../security/auth/PermissionTypeTest.scala      |  20 +-
 .../kafka/security/auth/ResourceTypeTest.scala  |  20 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |  52 ++++-
 20 files changed, 397 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 6e2968a..a0730ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -547,11 +547,12 @@ public class NetworkClient implements KafkaClient {
     }
 
     public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
-        return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
+        return createResponse(parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader,
+                null, 0), requestHeader);
     }
 
-    private static AbstractResponse parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
-            Sensor throttleTimeSensor, long now) {
+    private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
+                                                                    Sensor throttleTimeSensor, long now) {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
         // Always expect the response version id to be the same as the request version id
         ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
@@ -559,7 +560,12 @@ public class NetworkClient implements KafkaClient {
         correlate(requestHeader, responseHeader);
         if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
             throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now);
-        return AbstractResponse.getResponse(apiKey, responseBody);
+        return responseBody;
+    }
+
+    private static AbstractResponse createResponse(Struct responseStruct, RequestHeader requestHeader) {
+        ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+        return AbstractResponse.getResponse(apiKey, responseStruct);
     }
 
     /**
@@ -646,8 +652,13 @@ public class NetworkClient implements KafkaClient {
         for (NetworkReceive receive : this.selector.completedReceives()) {
             String source = receive.source();
             InFlightRequest req = inFlightRequests.completeNext(source);
-            AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now);
-            log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
+            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
+                throttleTimeSensor, now);
+            if (log.isTraceEnabled()) {
+                log.trace("Completed receive from node {}, for key {}, received {}", req.destination,
+                    req.header.apiKey(), responseStruct.toString());
+            }
+            AbstractResponse body = createResponse(responseStruct, req.header);
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
             else if (req.isInternalRequest && body instanceof ApiVersionsResponse)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
index c63320d..7b17e4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -22,6 +22,19 @@ import java.util.Locale;
 
 /**
  * Represents an operation which an ACL grants or denies permission to perform.
+ *
+ * Some operations imply other operations.
+ *
+ * ALLOW ALL implies ALLOW everything
+ * DENY ALL implies DENY everything
+ *
+ * ALLOW READ implies ALLOW DESCRIBE
+ * ALLOW WRITE implies ALLOW DESCRIBE
+ * ALLOW DELETE implies ALLOW DESCRIBE
+ *
+ * ALLOW ALTER implies ALLOW DESCRIBE
+ *
+ * ALLOW ALTER_CONFIGS implies ALLOW DESCRIBE_CONFIGS
  */
 public enum AclOperation {
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 99b35e8..1686976 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -125,4 +125,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
         }
     }
 
+    public String toString(short version) {
+        return toStruct(version).toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
index 2883a03..93a0cd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -26,6 +26,16 @@ public class Resource {
     private final ResourceType resourceType;
     private final String name;
 
+    /**
+     * The name of the CLUSTER resource.
+     */
+    public final static String CLUSTER_NAME = "kafka-cluster";
+
+    /**
+     * A resource representing the whole cluster.
+     */
+    public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME);
+
     public Resource(ResourceType resourceType, String name) {
         Objects.requireNonNull(resourceType);
         this.resourceType = resourceType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
index a1b7b2b..f85e2c4 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -51,9 +51,9 @@ public enum ResourceType {
     CLUSTER((byte) 4),
 
     /**
-     * A broker.
+     * A transactional ID.
      */
-    BROKER((byte) 5);
+    TRANSACTIONAL_ID((byte) 5);
 
     private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
index 5f5a87c..ba09499 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -61,6 +61,7 @@ public class AclOperationTest {
 
     @Test
     public void testCode() throws Exception {
+        assertEquals(AclOperation.values().length, INFOS.length);
         for (AclOperationTestInfo info : INFOS) {
             assertEquals(info.operation + " was supposed to have code == " + info.code,
                 info.code, info.operation.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
index 8e7fdc7..15b9068 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -53,6 +53,7 @@ public class AclPermissionTypeTest {
 
     @Test
     public void testCode() throws Exception {
+        assertEquals(AclPermissionType.values().length, INFOS.length);
         for (AclPermissionTypeTestInfo info : INFOS) {
             assertEquals(info.ty + " was supposed to have code == " + info.code,
                 info.code, info.ty.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index 4dc4cac..9adade1 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -41,7 +41,7 @@ public class ResourceTypeTest {
         new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
         new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
         new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
-        new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false)
+        new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false)
     };
 
     @Test
@@ -54,6 +54,7 @@ public class ResourceTypeTest {
 
     @Test
     public void testCode() throws Exception {
+        assertEquals(ResourceType.values().length, INFOS.length);
         for (AclResourceTypeTestInfo info : INFOS) {
             assertEquals(info.resourceType + " was supposed to have code == " + info.code,
                 info.code, info.resourceType.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 8cbd8a6..4522135 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -33,7 +33,7 @@ object AclCommand {
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
     Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
     Group -> Set(Read, Describe, All),
-    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, All),
+    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
     TransactionalId -> Set(Describe, Write, All)
   )
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8f98a8c..e5d12e8 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -237,14 +237,15 @@ class RequestSendThread(val controllerId: Int,
         }
       }
       if (clientResponse != null) {
-        val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
+        val requestHeader = clientResponse.requestHeader
+        val api = ApiKeys.forId(requestHeader.apiKey)
         if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY)
           throw new KafkaException(s"Unexpected apiKey received: $apiKey")
 
         val response = clientResponse.responseBody
 
         stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
-          .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
+          .format(controllerId, controllerContext.epoch, response.toString(requestHeader.apiVersion), brokerNode.toString))
 
         if (callback != null) {
           callback(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index d3a25b5..a13345a 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -81,13 +81,7 @@ object Operation {
     op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
   }
 
-  def fromJava(operation: AclOperation): Try[Operation] = {
-    try {
-      Success(fromString(operation.toString))
-    } catch {
-      case throwable: Throwable => Failure(throwable)
-    }
-  }
+  def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", ""))
 
   def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
      DescribeConfigs, IdempotentWrite, All)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index ec99ae4..686c60b 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -46,13 +46,7 @@ object PermissionType {
     pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
   }
 
-  def fromJava(permissionType: AclPermissionType): Try[PermissionType] = {
-    try {
-      Success(fromString(permissionType.toString))
-    } catch {
-      case throwable: Throwable => Failure(throwable)
-    }
-  }
+  def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
 
   def values: Seq[PermissionType] = List(Allow, Deny)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 4deb23b..b046ddd 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -18,27 +18,35 @@ package kafka.security.auth
 
 import kafka.common.{BaseEnum, KafkaException}
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.resource.{ResourceType => JResourceType}
 
-sealed trait ResourceType extends BaseEnum { def error: Errors }
-
-case object Cluster extends ResourceType {
-  val name = "Cluster"
-  val error = Errors.CLUSTER_AUTHORIZATION_FAILED
+sealed trait ResourceType extends BaseEnum {
+  def error: Errors
+  def toJava: JResourceType
 }
 
 case object Topic extends ResourceType {
   val name = "Topic"
   val error = Errors.TOPIC_AUTHORIZATION_FAILED
+  val toJava = JResourceType.TOPIC
 }
 
 case object Group extends ResourceType {
   val name = "Group"
   val error = Errors.GROUP_AUTHORIZATION_FAILED
+  val toJava = JResourceType.GROUP
+}
+
+case object Cluster extends ResourceType {
+  val name = "Cluster"
+  val error = Errors.CLUSTER_AUTHORIZATION_FAILED
+  val toJava = JResourceType.CLUSTER
 }
 
 case object TransactionalId extends ResourceType {
   val name = "TransactionalId"
   val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
+  val toJava = JResourceType.TRANSACTIONAL_ID
 }
 
 object ResourceType {
@@ -48,5 +56,7 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Cluster, Topic, Group, TransactionalId)
+  def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId)
+
+  def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", ""))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 0e78f52..03eb9e3 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -124,17 +124,18 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val host = session.clientAddress.getHostAddress
     val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
 
-    //check if there is any Deny acl match that would disallow this operation.
+    // Check if there is any Deny acl match that would disallow this operation.
     val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
 
-    //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny.
-    val ops = if (Describe == operation)
-      Set[Operation](operation, Read, Write, Delete)
-    else
-      Set[Operation](operation)
-
-    //now check if there is any allow acl that will allow this operation.
-    val allowMatch = ops.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
+    // Check if there are any Allow ACLs which would allow this operation.
+    // Allowing read, write, delete, or alter implies allowing describe.
+    // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
+    val allowOps = operation match {
+      case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
+      case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
+      case _ => Set[Operation](operation)
+    }
+    val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
 
     //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
     //when no acls are found or if no deny acls are found and at least one allow acls matches.

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6cff0e6..5fb13a6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1747,7 +1747,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeAcls(request: RequestChannel.Request): Unit = {
-    authorizeClusterAction(request)
+    authorizeClusterDescribe(request)
     val describeAclsRequest = request.body[DescribeAclsRequest]
     authorizer match {
       case None =>
@@ -1786,15 +1786,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type"))
       case _ => {}
     }
-    var resourceType: ResourceType = null
-    try {
-      resourceType = ResourceType.fromString(filter.resourceFilter().resourceType().toString)
+    val resourceType: ResourceType = try {
+      ResourceType.fromJava(filter.resourceFilter.resourceType)
     } catch {
       case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type"))
     }
-    var principal: KafkaPrincipal = null
-    try {
-      principal = KafkaPrincipal.fromString(filter.entryFilter().principal())
+    val principal: KafkaPrincipal = try {
+      KafkaPrincipal.fromString(filter.entryFilter.principal)
     } catch {
       case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal"))
     }
@@ -1803,18 +1801,20 @@ class KafkaApis(val requestChannel: RequestChannel,
       case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type"))
       case _ => {}
     }
-    val operation = Operation.fromJava(filter.entryFilter().operation()) match {
-      case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage))
-      case Success(op) => op
+    val operation: Operation = try {
+      Operation.fromJava(filter.entryFilter.operation)
+    } catch {
+      case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
     }
     filter.entryFilter().permissionType() match {
       case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type")
       case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type")
       case _ => {}
     }
-    val permissionType = PermissionType.fromJava(filter.entryFilter.permissionType) match {
-      case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage))
-      case Success(perm) => perm
+    val permissionType: PermissionType = try {
+      PermissionType.fromJava(filter.entryFilter.permissionType)
+    } catch {
+      case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
     }
     return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType,
                    filter.entryFilter().host(), operation)))
@@ -1837,7 +1837,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateAcls(request: RequestChannel.Request): Unit = {
-    authorizeClusterAction(request)
+    authorizeClusterAlter(request)
     val createAclsRequest = request.body[CreateAclsRequest]
     authorizer match {
       case None =>
@@ -1846,7 +1846,6 @@ class KafkaApis(val requestChannel: RequestChannel,
             new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
         val errors = mutable.HashMap[Int, Throwable]()
-        val creations = ListBuffer[(Resource, Acl)]()
         for (i <- 0 until createAclsRequest.aclCreations.size) {
           val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
           result match {
@@ -1859,8 +1858,13 @@ class KafkaApis(val requestChannel: RequestChannel,
                 if (resource.name.isEmpty)
                   throw new InvalidRequestException("Invalid empty resource name")
                 auth.addAcls(immutable.Set(acl), resource)
+                if (logger.isDebugEnabled)
+                  logger.debug(s"Added acl $acl to $resource")
               } catch {
-                case throwable : Throwable => errors.put(i, throwable)
+                case throwable : Throwable => if (logger.isDebugEnabled) {
+                    logger.debug(s"Failed to add acl $acl to $resource", throwable)
+                  }
+                  errors.put(i, throwable)
               }
           }
         }
@@ -1877,7 +1881,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDeleteAcls(request: RequestChannel.Request): Unit = {
-    authorizeClusterAction(request)
+    authorizeClusterAlter(request)
     val deleteAclsRequest = request.body[DeleteAclsRequest]
     authorizer match {
       case None =>
@@ -1997,8 +2001,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RResourceType.BROKER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, AlterConfigs, new Resource(Topic, resource.name)) ||
-            authorize(request.session, AlterConfigs, Resource.ClusterResource)
+          authorize(request.session, AlterConfigs, new Resource(Topic, resource.name))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2030,8 +2033,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       resource.`type` match {
         case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
         case RResourceType.TOPIC =>
-          authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name)) ||
-            authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+          authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
       }
     }
@@ -2052,6 +2054,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
+  def authorizeClusterAlter(request: RequestChannel.Request): Unit = {
+    if (!authorize(request.session, Alter, Resource.ClusterResource))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
+  def authorizeClusterDescribe(request: RequestChannel.Request): Unit = {
+    if (!authorize(request.session, Describe, Resource.ClusterResource))
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
+  }
+
   private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
     sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
       sendResponse(request, createResponse(requestThrottleMs))

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 9cd86c3..815d3a4 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -14,72 +14,103 @@ package kafka.api
 
 import java.io.File
 
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
-import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions, KafkaAdminClient}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
 import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert.assertEquals
 import org.junit.{After, Assert, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
 
 class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
-  this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
+  override def configureSecurityBeforeServersStart() {
+    val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName())
+    authorizer.configure(this.configs.head.originals())
+    authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow,
+                            AuthAcl.WildCardHost, All)), new AuthResource(Topic, "*"))
+    authorizer.addAcls(Set(clusterAcl(Allow, Create),
+                           clusterAcl(Allow, Delete),
+                           clusterAcl(Allow, ClusterAction),
+                           clusterAcl(Allow, AlterConfigs),
+                           clusterAcl(Allow, Alter)),
+                       AuthResource.ClusterResource)
+  }
+
   @Before
   override def setUp(): Unit = {
     startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
     super.setUp()
   }
 
+  private def clusterAcl(permissionType: PermissionType, operation: Operation): AuthAcl = {
+    new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), permissionType,
+      AuthAcl.WildCardHost, operation)
+  }
+
+  private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
+    val acls = Set(clusterAcl(permissionType, operation))
+    val authorizer = servers.head.apis.authorizer.get
+    val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
+    authorizer.addAcls(acls, AuthResource.ClusterResource)
+    TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource)
+  }
+
+  private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
+    val acls = Set(clusterAcl(permissionType, operation))
+    val authorizer = servers.head.apis.authorizer.get
+    val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
+    Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource))
+    TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource)
+  }
+
   @After
   override def tearDown(): Unit = {
     super.tearDown()
     closeSasl()
   }
 
-  val ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"),
-    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
-  val ACL3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
-    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW));
-  val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
-    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW));
-
-  /**
-    * Test that ACL operations are not possible when the authorizer is disabled.
-    * Also see {@link kafka.api.KafkaAdminClientSecureIntegrationTest} for tests of ACL operations
-    * when the authorizer is enabled.
-    */
+  val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+  val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+  val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+
   @Test
   override def testAclOperations(): Unit = {
     client = AdminClient.create(createConfig())
-    assertEquals(0, client.describeAcls(AclBindingFilter.ANY).all().get().size())
-    val results = client.createAcls(List(ACL2, ACL3).asJava)
-    assertEquals(Set(ACL2, ACL3), results.results().keySet().asScala)
-    results.results().values().asScala.foreach(value => value.get)
-    val results2 = client.createAcls(List(ACL_UNKNOWN).asJava)
-    assertEquals(Set(ACL_UNKNOWN), results2.results().keySet().asScala)
-    assertFutureExceptionTypeEquals(results2.all(), classOf[InvalidRequestException])
-    val results3 = client.deleteAcls(List(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter).asJava)
-    assertEquals(Set(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter), results3.results().keySet().asScala)
-    assertEquals(0, results3.results.get(ACL1.toFilter).get.acls.size())
-    assertEquals(Set(ACL2), results3.results.get(ACL2.toFilter).get.acls.asScala.map(result => result.acl()).toSet)
-    assertEquals(Set(ACL3), results3.results.get(ACL3.toFilter).get.acls.asScala.map(result => result.acl()).toSet)
-    client.close()
+    assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all.get().size)
+    val results = client.createAcls(List(acl2, acl3).asJava)
+    assertEquals(Set(acl2, acl3), results.results.keySet().asScala)
+    results.results.values().asScala.foreach(value => value.get)
+    val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
+    val results2 = client.createAcls(List(aclUnknown).asJava)
+    assertEquals(Set(aclUnknown), results2.results.keySet().asScala)
+    assertFutureExceptionTypeEquals(results2.all, classOf[InvalidRequestException])
+    val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).results
+    assertEquals(Set(ACL1.toFilter, acl2.toFilter, acl3.toFilter), results3.keySet.asScala)
+    assertEquals(0, results3.get(ACL1.toFilter).get.acls.size())
+    assertEquals(Set(acl2), results3.get(acl2.toFilter).get.acls.asScala.map(_.acl).toSet)
+    assertEquals(Set(acl3), results3.get(acl3.toFilter).get.acls.asScala.map(_.acl).toSet)
   }
 
   def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val results = client.describeAcls(filter).all().get()
+      val results = client.describeAcls(filter).all.get()
       acls == results.asScala.toSet
     }, "timed out waiting for ACLs")
   }
@@ -87,24 +118,22 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
   @Test
   def testAclOperations2(): Unit = {
     client = AdminClient.create(createConfig())
-    val results = client.createAcls(List(ACL2, ACL2).asJava)
-    assertEquals(Set(ACL2, ACL2), results.results().keySet().asScala)
-    results.all().get()
-    waitForDescribeAcls(client, AclBindingFilter.ANY, Set(ACL2))
+    val results = client.createAcls(List(acl2, acl2).asJava)
+    assertEquals(Set(acl2, acl2), results.results.keySet().asScala)
+    results.all.get()
+    waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
 
-    val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.CLUSTER, null), AccessControlEntryFilter.ANY)
+    val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY)
     val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY)
 
     waitForDescribeAcls(client, filterA, Set())
 
     val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
-    assertEquals(Set(filterA, filterB), results2.results().keySet().asScala)
-    assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(result => result.acl()).toSet)
-    assertEquals(Set(ACL2), results2.results.get(filterB).get.acls.asScala.map(result => result.acl()).toSet)
+    assertEquals(Set(filterA, filterB), results2.results.keySet().asScala)
+    assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(_.acl).toSet)
+    assertEquals(Set(acl2), results2.results.get(filterB).get.acls.asScala.map(_.acl).toSet)
 
     waitForDescribeAcls(client, filterB, Set())
-
-    client.close()
   }
 
   @Test
@@ -115,8 +144,116 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
     val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
-    assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results().keySet().asScala)
-    assertFutureExceptionTypeEquals(results.results().get(clusterAcl), classOf[InvalidRequestException])
-    assertFutureExceptionTypeEquals(results.results().get(emptyResourceNameAcl), classOf[InvalidRequestException])
+    assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results.keySet().asScala)
+    assertFutureExceptionTypeEquals(results.results.get(clusterAcl), classOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(results.results.get(emptyResourceNameAcl), classOf[InvalidRequestException])
+  }
+
+  private def verifyCauseIsClusterAuth(e: Throwable): Unit = {
+    if (!e.getCause.isInstanceOf[ClusterAuthorizationException]) {
+      throw e.getCause
+    }
+  }
+
+  private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      val result = client.createAcls(List(fooAcl).asJava, new CreateAclsOptions)
+      if (expectAuth) {
+        Try(result.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            false
+          case Success(_) => true
+        }
+      } else {
+        Try(result.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            true
+          case Success(_) => false
+        }
+      }
+    }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail"))
+    if (expectAuth) {
+      waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl))
+    }
+    TestUtils.waitUntilTrue(() => {
+      val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new DeleteAclsOptions)
+      if (expectAuth) {
+        Try(result.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            false
+          case Success(_) => true
+        }
+      } else {
+        Try(result.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            true
+          case Success(_) =>
+            assertEquals(Set(fooAcl), result.results.get(fooAcl.toFilter).get.acls.asScala.map(_.acl).toSet)
+            true
+        }
+      }
+    }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail"))
+    if (expectAuth) {
+      waitForDescribeAcls(client, fooAcl.toFilter, Set())
+    }
+  }
+
+  private def testAclGet(expectAuth: Boolean): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"),
+        new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
+      val results = client.describeAcls(userAcl.toFilter)
+      if (expectAuth) {
+        Try(results.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            false
+          case Success(acls) => Set(userAcl).equals(acls.asScala.toSet)
+        }
+      } else {
+        Try(results.all.get) match {
+          case Failure(e) =>
+            verifyCauseIsClusterAuth(e)
+            true
+          case Success(_) => false
+        }
+      }
+    }, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail"))
+  }
+
+  @Test
+  def testAclAuthorizationDenied(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    // Test that we cannot create or delete ACLs when Alter is denied.
+    addClusterAcl(Deny, Alter)
+    testAclGet(expectAuth = true)
+    testAclCreateGetDelete(expectAuth = false)
+
+    // Test that we cannot do anything with ACLs when Describe and Alter are denied.
+    addClusterAcl(Deny, Describe)
+    testAclGet(expectAuth = false)
+    testAclCreateGetDelete(expectAuth = false)
+
+    // Test that we can create, delete, and get ACLs with the default ACLs.
+    removeClusterAcl(Deny, Describe)
+    removeClusterAcl(Deny, Alter)
+    testAclGet(expectAuth = true)
+    testAclCreateGetDelete(expectAuth = true)
+
+    // Test that we can't do anything with ACLs without the Allow Alter ACL in place.
+    removeClusterAcl(Allow, Alter)
+    removeClusterAcl(Allow, Delete)
+    testAclGet(expectAuth = false)
+    testAclCreateGetDelete(expectAuth = false)
+
+    // Test that we can describe, but not alter ACLs, with only the Allow Describe ACL in place.
+    addClusterAcl(Allow, Describe)
+    testAclGet(expectAuth = true)
+    testAclCreateGetDelete(expectAuth = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 1df34ea..242c768 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -14,25 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.security.auth
 
-import kafka.common.{KafkaException}
-import org.junit.{Test, Assert}
+import org.apache.kafka.common.acl.AclOperation
+import org.junit.Assert.assertEquals
+import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
 class OperationTest extends JUnitSuite {
-
+  /**
+    * Test round trip conversions between org.apache.kafka.common.acl.AclOperation and
+    * kafka.security.auth.Operation.
+    */
   @Test
-  def testFromString(): Unit = {
-    val op = Operation.fromString("READ")
-    Assert.assertEquals(Read, op)
-
-    try {
-      Operation.fromString("badName")
-      fail("Expected exception on invalid operation name.")
-    } catch {
-      case _: KafkaException => // expected
+  def testJavaConversions(): Unit = {
+    AclOperation.values.foreach {
+      case AclOperation.UNKNOWN | AclOperation.ANY =>
+      case aclOp =>
+        val op = Operation.fromJava(aclOp)
+        val aclOp2 = op.toJava
+        assertEquals(aclOp, aclOp2)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 1b1c864..0ee66e6 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -17,7 +17,9 @@
 package kafka.security.auth
 
 import kafka.common.KafkaException
-import org.junit.{Test, Assert}
+import org.apache.kafka.common.acl.AclPermissionType
+import org.junit.Assert.assertEquals
+import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
 class PermissionTypeTest extends JUnitSuite {
@@ -25,7 +27,7 @@ class PermissionTypeTest extends JUnitSuite {
   @Test
   def testFromString(): Unit = {
     val permissionType = PermissionType.fromString("Allow")
-    Assert.assertEquals(Allow, permissionType)
+    assertEquals(Allow, permissionType)
 
     try {
       PermissionType.fromString("badName")
@@ -35,4 +37,18 @@ class PermissionTypeTest extends JUnitSuite {
     }
   }
 
+  /**
+    * Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and
+    * kafka.security.auth.PermissionType.
+    */
+  @Test
+  def testJavaConversions(): Unit = {
+    AclPermissionType.values().foreach {
+      case AclPermissionType.UNKNOWN | AclPermissionType.ANY =>
+      case aclPerm =>
+        val perm = PermissionType.fromJava(aclPerm)
+        val aclPerm2 = perm.toJava
+        assertEquals(aclPerm, aclPerm2)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index 546c92e..0d99378 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -17,15 +17,17 @@
 package kafka.security.auth
 
 import kafka.common.KafkaException
-import org.junit.{Test, Assert}
+import org.junit.Assert.assertEquals
+import org.junit.Test
 import org.scalatest.junit.JUnitSuite
+import org.apache.kafka.common.resource.{ResourceType => JResourceType}
 
 class ResourceTypeTest extends JUnitSuite {
 
   @Test
   def testFromString(): Unit = {
     val resourceType = ResourceType.fromString("Topic")
-    Assert.assertEquals(Topic, resourceType)
+    assertEquals(Topic, resourceType)
 
     try {
       ResourceType.fromString("badName")
@@ -35,4 +37,18 @@ class ResourceTypeTest extends JUnitSuite {
     }
   }
 
+  /**
+    * Test round trip conversions between org.apache.kafka.common.acl.ResourceType and
+    * kafka.security.auth.ResourceType.
+    */
+  @Test
+  def testJavaConversions(): Unit = {
+    JResourceType.values.foreach {
+      case JResourceType.UNKNOWN | JResourceType.ANY =>
+      case jResourceType =>
+        val resourceType = ResourceType.fromJava(jResourceType)
+        val jResourceType2 = resourceType.toJava
+        assertEquals(jResourceType, jResourceType2)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fc91afb/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 6017a7b..5dbd1a8 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -17,7 +17,7 @@
 package kafka.security.auth
 
 import java.net.InetAddress
-import java.util.{UUID}
+import java.util.UUID
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
@@ -351,6 +351,56 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
   }
 
+  /**
+    * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
+    */
+  @Test
+  def testAclInheritance(): Unit = {
+    testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe,
+      ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
+    testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe,
+      ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
+    testImplicationsOfAllow(Read, Set(Describe))
+    testImplicationsOfAllow(Write, Set(Describe))
+    testImplicationsOfAllow(Delete, Set(Describe))
+    testImplicationsOfAllow(Alter, Set(Describe))
+    testImplicationsOfDeny(Describe, Set())
+    testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs))
+    testImplicationsOfDeny(DescribeConfigs, Set())
+  }
+
+  private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host = InetAddress.getByName("192.168.3.1")
+    val hostSession = Session(user, host)
+    val acl = Acl(user, Allow, WildCardHost, parentOp)
+    simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource)
+    Operation.values.foreach { op =>
+      val authorized = simpleAclAuthorizer.authorize(hostSession, op, Resource.ClusterResource)
+      if (allowedOps.contains(op) || op == parentOp)
+        assertTrue(s"ALLOW $parentOp should imply ALLOW $op", authorized)
+      else
+        assertFalse(s"ALLOW $parentOp should not imply ALLOW $op", authorized)
+    }
+    simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource)
+  }
+
+  private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host1 = InetAddress.getByName("192.168.3.1")
+    val host1Session = Session(user1, host1)
+    val acls = Set(Acl(user1, Deny, WildCardHost, parentOp), Acl(user1, Allow, WildCardHost, All))
+    simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource)
+    Operation.values.foreach { op =>
+      val authorized = simpleAclAuthorizer.authorize(host1Session, op, Resource.ClusterResource)
+      if (deniedOps.contains(op) || op == parentOp)
+        assertFalse(s"DENY $parentOp should imply DENY $op", authorized)
+      else
+        assertTrue(s"DENY $parentOp should not imply DENY $op", authorized)
+    }
+    simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource)
+  }
+
   @Test
   def testHighConcurrencyDeletionOfResourceAcls() {
     val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)


Mime
View raw message