kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7496: Handle invalid filters gracefully in KafkaAdminClient#describeAcls (#5774)
Date Tue, 16 Oct 2018 11:11:08 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 1d14d7a  KAFKA-7496: Handle invalid filters gracefully in KafkaAdminClient#describeAcls
 (#5774)
1d14d7a is described below

commit 1d14d7a4ab2ce98a1a41cac3fd78a7c1a9e9dc5c
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Tue Oct 16 04:07:49 2018 -0700

    KAFKA-7496: Handle invalid filters gracefully in KafkaAdminClient#describeAcls  (#5774)
---
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java     | 6 ++++++
 .../java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 8 ++++++++
 2 files changed, 14 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b0120e0..15021bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1548,6 +1548,12 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions
options) {
+        if (filter.isUnknown()) {
+            KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>();
+            future.completeExceptionally(new InvalidRequestException("The AclBindingFilter
" +
+                    "must not contain UNKNOWN elements."));
+            return new DescribeAclsResult(future);
+        }
         final long now = time.milliseconds();
         final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>();
         runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 34432c3..dc55ea2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -561,6 +562,9 @@ public class KafkaAdminClientTest {
         new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
     private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY,
null, PatternType.LITERAL),
         new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY));
+    private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(
+        new ResourcePatternFilter(ResourceType.UNKNOWN, null, PatternType.LITERAL),
+        new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
     public void testDescribeAcls() throws Exception {
@@ -582,6 +586,10 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
                 new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
             TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(),
SecurityDisabledException.class);
+
+            // Test a call where we supply an invalid filter.
+            TestUtils.assertFutureError(env.adminClient().describeAcls(UNKNOWN_FILTER).values(),
+                InvalidRequestException.class);
         }
     }
 


Mime
View raw message