From commits-return-12936-archive-asf-public=cust-asf.ponee.io@sentry.apache.org Thu Jul 18 17:54:56 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5C697180684 for ; Thu, 18 Jul 2019 19:54:55 +0200 (CEST) Received: (qmail 95257 invoked by uid 500); 18 Jul 2019 17:54:54 -0000 Mailing-List: contact commits-help@sentry.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sentry.apache.org Delivered-To: mailing list commits@sentry.apache.org Received: (qmail 95238 invoked by uid 99); 18 Jul 2019 17:54:54 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jul 2019 17:54:54 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 550C285E28; Thu, 18 Jul 2019 17:54:54 +0000 (UTC) Date: Thu, 18 Jul 2019 17:54:54 +0000 To: "commits@sentry.apache.org" Subject: [sentry] branch master updated: SENTRY-2276 Sentry-Kafka integration does not support Kafka's Alter/DescribeConfigs and IdempotentWrite operations (Gergo Wilder reviewed by Kalyan Kumar Kalvagadda) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156347249423.7865.10426728569412950006@gitbox.apache.org> From: kalyan@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: sentry X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 943b3e00ba4771e4005bc7eb25014b8bc640f0c8 X-Git-Newrev: be2e6e8ee17bc9e50bf55f39fea94c3826c64f44 X-Git-Rev: be2e6e8ee17bc9e50bf55f39fea94c3826c64f44 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kalyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sentry.git The following commit(s) were added to refs/heads/master by this push: new be2e6e8 SENTRY-2276 Sentry-Kafka integration does not support Kafka's Alter/DescribeConfigs and IdempotentWrite operations (Gergo Wilder reviewed by Kalyan Kumar Kalvagadda) be2e6e8 is described below commit be2e6e8ee17bc9e50bf55f39fea94c3826c64f44 Author: Kalyan Kumar Kalvagadda AuthorDate: Thu Jul 18 12:36:01 2019 -0500 SENTRY-2276 Sentry-Kafka integration does not support Kafka's Alter/DescribeConfigs and IdempotentWrite operations (Gergo Wilder reviewed by Kalyan Kumar Kalvagadda) Change-Id: Ie0b7add60affe9901765339344abaa3944b8fc7a --- .../sentry/kafka/authorizer/ConvertUtilTest.java | 18 ++ .../authorizer/SentryKafkaAuthorizerTest.java | 28 ++- .../kafka/AbstractTestKafkaPolicyEngine.java | 47 ++++- ...TestKafkaAuthorizationProviderGeneralCases.java | 72 +++++++- .../policy/kafka/TestKafkaModelAuthorizables.java | 4 + .../policy/kafka/TestKafkaPrivilegeValidator.java | 21 +++ .../resources/kafka-policy-test-authz-provider.ini | 14 +- .../src/test/resources/sentry-site.xml | 2 +- .../src/test/resources/test-authz-provider.ini | 38 ---- .../core/model/kafka/KafkaActionConstant.java | 3 + .../core/model/kafka/KafkaActionFactory.java | 6 +- .../sentry/core/model/kafka/KafkaAuthorizable.java | 6 +- .../core/model/kafka/KafkaModelAuthorizables.java | 2 + .../core/model/kafka/KafkaPrivilegeModel.java | 2 + .../sentry/core/model/kafka/TransactionalId.java | 61 +++++++ .../sentry/core/model/kafka/TestKafkaAction.java | 197 ++++++++++----------- .../core/model/kafka/TestKafkaAuthorizable.java | 6 + 17 files changed, 374 insertions(+), 153 deletions(-) diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java index e08d442..494e212 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java @@ -82,4 +82,22 @@ public class ConvertUtilTest { } Assert.assertEquals(authorizables.size(), 2); } + + @Test + public void testTransactionalId() { + String hostname = "localhost"; + String transactionalId = "t1"; + Resource transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), transactionalId); + List authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, transactionalIdResource); + for (Authorizable auth : authorizables) { + if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name())) { + Assert.assertEquals(auth.getName(), transactionalId); + } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) { + Assert.assertEquals(auth.getName(), hostname); + } else { + Assert.fail("Unexpected type found: " + auth.getTypeName()); + } + } + Assert.assertEquals(authorizables.size(), 2); + } } diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java index f40d8c2..84aa5b1 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java @@ -40,6 +40,7 @@ public class SentryKafkaAuthorizerTest { private String resourceName; private Resource clusterResource; private Resource topic1Resource; + private Resource transactionalIdResource; private KafkaConfig config; public SentryKafkaAuthorizerTest() throws UnknownHostException { @@ -49,6 +50,7 @@ public class SentryKafkaAuthorizerTest { resourceName = Resource$.MODULE$.ClusterResourceName(); clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), resourceName); topic1Resource = new Resource(ResourceType$.MODULE$.fromString("topic"), "t1"); + transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), "tid1"); } @Before @@ -66,7 +68,7 @@ public class SentryKafkaAuthorizerTest { @Test public void testAdmin() { - KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin"); + KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin_group"); RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1); RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2); @@ -79,7 +81,12 @@ public class SentryKafkaAuthorizerTest { Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); - Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource)); Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); @@ -91,11 +98,15 @@ public class SentryKafkaAuthorizerTest { Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource)); } @Test public void testSubAdmin() { - KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin"); + KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin_group2"); RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1); RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2); @@ -108,7 +119,11 @@ public class SentryKafkaAuthorizerTest { Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); - Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource)); Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource)); Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); @@ -120,6 +135,9 @@ public class SentryKafkaAuthorizerTest { Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); - + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource)); } } diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java index 086b707..1ec2b19 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java @@ -43,6 +43,11 @@ public abstract class AbstractTestKafkaPolicyEngine { private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write"; private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write"; private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write"; + private static final String PRODUCER_TI1_HOST1 = "host=host1->transactionalid=ti1->action=write"; + private static final String PRODUCER_TI2_HOST2 = "host=host2->transactionalid=ti2->action=write"; + private static final String PRODUCER_IDEMPOTENTWRITE = "host=host1->cluster=kafka-cluster->action=idempotentwrite"; + private static final String CONFIG_ADMIN_HOST1 = "host=host1->cluster=kafka-cluster->action=describeconfigs"; + private static final String CONFIG_ADMIN_T1_HOST2 = "host=host2->topic=t1->action=alterconfigs"; private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all"; private PolicyEngine policy; @@ -133,6 +138,46 @@ public abstract class AbstractTestKafkaPolicyEngine { } @Test + public void testProducer3() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI1_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group3"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer4() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI2_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group4"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer5() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_IDEMPOTENTWRITE)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group5"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConfigAdmin1() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("config_admin_group1"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConfigAdmin2() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_T1_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("config_admin_group2"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test public void testConsumerProducer0() throws Exception { Set expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1)); Assert.assertEquals(expected.toString(), @@ -144,7 +189,7 @@ public abstract class AbstractTestKafkaPolicyEngine { public void testSubAdmin() throws Exception { Set expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1)); Assert.assertEquals(expected.toString(), - new TreeSet(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL)) + new TreeSet(policy.getPrivileges(set("subadmin_group1"), ActiveRoleSet.ALL)) .toString()); } diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java index af92659..5ac3b0c 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java @@ -38,6 +38,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel; import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.core.model.kafka.TransactionalId; ; import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider; import org.apache.sentry.provider.common.ResourceAuthorizationProvider; import org.apache.sentry.core.common.utils.PolicyFiles; @@ -60,6 +61,8 @@ public class TestKafkaAuthorizationProviderGeneralCases { private static final Topic topic2 = new Topic("t2"); private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1"); private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2"); + private static final TransactionalId transactionalId1 = new TransactionalId("ti1"); + private static final TransactionalId transactionalId2 = new TransactionalId("ti2"); private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL); private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ); @@ -68,10 +71,13 @@ public class TestKafkaAuthorizationProviderGeneralCases { private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE); private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER); private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE); - private static final KafkaAction CLUSTER_ACTION = new KafkaAction( - KafkaActionConstant.CLUSTER_ACTION); + private static final KafkaAction CLUSTER_ACTION = new KafkaAction(KafkaActionConstant.CLUSTER_ACTION); + private static final KafkaAction ALTER_CONFIGS = new KafkaAction(KafkaActionConstant.ALTER_CONFIGS); + private static final KafkaAction DESCRIBE_CONFIGS = new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS); + private static final KafkaAction IDEMPOTENT_WRITE = new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE); - private static final Set allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION); + private static final Set allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, + CLUSTER_ACTION, ALTER_CONFIGS, DESCRIBE_CONFIGS, IDEMPOTENT_WRITE); private static final Subject ADMIN = new Subject("admin1"); private static final Subject SUB_ADMIN = new Subject("subadmin1"); @@ -81,16 +87,27 @@ public class TestKafkaAuthorizationProviderGeneralCases { private static final Subject PRODUCER0 = new Subject("producer0"); private static final Subject PRODUCER1 = new Subject("producer1"); private static final Subject PRODUCER2 = new Subject("producer2"); + private static final Subject PRODUCER3 = new Subject("producer3"); + private static final Subject PRODUCER4 = new Subject("producer4"); + private static final Subject PRODUCER5 = new Subject("producer5"); + private static final Subject CONFIG_ADMIN1 = new Subject("config_admin1"); + private static final Subject CONFIG_ADMIN2 = new Subject("config_admin2"); private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0"); private static final String ADMIN_GROUP = "admin_group"; - private static final String SUBADMIN_GROUP = "subadmin_group"; + private static final String SUBADMIN_GROUP = "subadmin_group1"; private static final String CONSUMER_GROUP0 = "consumer_group0"; private static final String CONSUMER_GROUP1 = "consumer_group1"; private static final String CONSUMER_GROUP2 = "consumer_group2"; private static final String PRODUCER_GROUP0 = "producer_group0"; private static final String PRODUCER_GROUP1 = "producer_group1"; private static final String PRODUCER_GROUP2 = "producer_group2"; + private static final String PRODUCER_GROUP3 = "producer_group3"; + private static final String PRODUCER_GROUP4 = "producer_group4"; + private static final String PRODUCER_GROUP5 = "producer_group5"; + private static final String CONFIG_ADMIN_GROUP1 = "config_admin_group1"; + private static final String CONFIG_ADMIN_GROUP2 = "config_admin_group2"; + private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0"; static { @@ -102,6 +119,11 @@ public class TestKafkaAuthorizationProviderGeneralCases { USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0)); USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1)); USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2)); + USER_TO_GROUP_MAP.putAll(PRODUCER3.getName(), Arrays.asList(PRODUCER_GROUP3)); + USER_TO_GROUP_MAP.putAll(PRODUCER4.getName(), Arrays.asList(PRODUCER_GROUP4)); + USER_TO_GROUP_MAP.putAll(PRODUCER5.getName(), Arrays.asList(PRODUCER_GROUP5)); + USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN1.getName(), Arrays.asList(CONFIG_ADMIN_GROUP1)); + USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN2.getName(), Arrays.asList(CONFIG_ADMIN_GROUP2)); USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0)); } @@ -171,12 +193,14 @@ public class TestKafkaAuthorizationProviderGeneralCases { Sets.newHashSet(action), READ.equals(action)); } } + for (KafkaAction action : allActions) { for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1), Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action)); } } + for (KafkaAction action : allActions) { for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2), @@ -193,18 +217,58 @@ public class TestKafkaAuthorizationProviderGeneralCases { Sets.newHashSet(action), WRITE.equals(action)); } } + for (KafkaAction action : allActions) { for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1), Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action)); } } + for (KafkaAction action : allActions) { for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2), Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action)); } } + + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER3, Arrays.asList(host, transactionalId1), + Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action)); + } + } + + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER4, Arrays.asList(host, transactionalId2), + Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action)); + } + } + + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER5, Arrays.asList(host, cluster1), + Sets.newHashSet(action), HOST_1.equals(host) && IDEMPOTENT_WRITE.equals(action)); + } + } + } + + @Test + public void testConfigAdmin() throws Exception { + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(CONFIG_ADMIN1, Arrays.asList(host, cluster1), + Sets.newHashSet(action), HOST_1.equals(host) && DESCRIBE_CONFIGS.equals(action)); + } + } + + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(CONFIG_ADMIN2, Arrays.asList(host, topic1), + Sets.newHashSet(action), HOST_2.equals(host) && ALTER_CONFIGS.equals(action)); + } + } } @Test diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java index 62fbea7..2c2e2c6 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java @@ -27,6 +27,7 @@ import org.apache.sentry.core.model.kafka.ConsumerGroup; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables; import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.core.model.kafka.TransactionalId; import org.apache.shiro.config.ConfigurationException; import org.junit.Test; @@ -71,6 +72,9 @@ public class TestKafkaModelAuthorizables { ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1"); assertEquals("CG1", consumergroup1.getName()); + + TransactionalId transactionalId1 = (TransactionalId) KafkaModelAuthorizables.from("TransactionalId=tRaNs1"); + assertEquals("tRaNs1", transactionalId1.getName()); } @Test diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java index ba66d43..61d5554 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java @@ -71,6 +71,11 @@ public class TestKafkaPrivilegeValidator { } catch (ConfigurationException ex) { Assert.fail("Not expected ConfigurationException"); } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transactionalid=t1->action=write")); + } catch (ConfigurationException ex) { + Assert.fail("Not expected ConfigurationException"); + } } @Test @@ -104,6 +109,16 @@ public class TestKafkaPrivilegeValidator { } @Test + public void testInvalidTransactionalIdResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transationalid=t1->action=write")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + + @Test public void testInvalidConsumerGroupResource() throws Exception { KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); try { @@ -144,6 +159,12 @@ public class TestKafkaPrivilegeValidator { } catch (ConfigurationException ex) { Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage()); } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->transactionalid=t1->action=read")); + Assert.fail("Kafka privilege can have one Host authorizable, at most one non Host authorizable and one action."); + } catch (ConfigurationException ex) { + Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage()); + } } @Test diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini index 1951aba..1effd42 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini @@ -17,22 +17,34 @@ [groups] admin_group = admin_all -subadmin_group = admin_host1 +subadmin_group1 = admin_host1 +subadmin_group2 = admin_host1234 consumer_group0 = consumer_t1_all consumer_group1 = consumer_t1_host1 consumer_group2 = consumer_t2_host2 producer_group0 = producer_t1_all producer_group1 = producer_t1_host1 producer_group2 = producer_t2_host2 +producer_group3 = producer_ti1_host1 +producer_group4 = producer_ti2_host2 +producer_group5 = producer_idempotentwrite_host1 +config_admin_group1 = config_admin_host1 +config_admin_group2 = config_admin_t1_host2 consumer_producer_group0 = consumer_producer_t1 [roles] admin_all = host=*->action=all admin_host1 = host=host1->action=all +admin_host1234 = host=1.2.3.4->action=all consumer_t1_all = host=*->topic=t1->action=read consumer_t1_host1 = host=host1->topic=t1->action=read consumer_t2_host2 = host=host2->topic=t2->action=read producer_t1_all = host=*->topic=t1->action=write producer_t1_host1 = host=host1->topic=t1->action=write producer_t2_host2 = host=host2->topic=t2->action=write +producer_ti1_host1 = host=host1->transactionalid=ti1->action=write +producer_ti2_host2 = host=host2->transactionalid=ti2->action=write +producer_idempotentwrite_host1 = host=host1->cluster=kafka-cluster->action=idempotentwrite +config_admin_host1 = host=host1->cluster=kafka-cluster->action=describeconfigs +config_admin_t1_host2 = host=host2->topic=t1->action=alterconfigs consumer_producer_t1 = host=host1->topic=t1->action=all diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml index 6383481..adc9239 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml @@ -28,7 +28,7 @@ sentry.kafka.provider.resource - classpath:test-authz-provider.ini + classpath:kafka-policy-test-authz-provider.ini sentry.kafka.policy.engine diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini deleted file mode 100644 index 520e1d0..0000000 --- a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini +++ /dev/null @@ -1,38 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[groups] -admin = admin_all -subadmin = admin_host1 -consumer0 = consumer_t1_all -consumer1 = consumer_t1_host1 -consumer2 = consumer_t2_host2 -producer0 = producer_t1_all -producer1 = producer_t1_host1 -producer2 = producer_t2_host2 -consumer_producer0 = consumer_producer_t1 - -[roles] -admin_all = host=*->action=all -admin_host1 = host=1.2.3.4->action=all -consumer_t1_all = host=*->topic=t1->action=read -consumer_t1_host1 = host=host1->topic=t1->action=read -consumer_t2_host2 = host=host2->topic=t2->action=read -producer_t1_all = host=*->topic=t1->action=write -producer_t1_host1 = host=host1->topic=t1->action=write -producer_t2_host2 = host=host2->topic=t2->action=write -consumer_producer_t1 = host=host1->topic=t1->action=all diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java index 17d7fb7..a95469b 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java @@ -29,6 +29,9 @@ public class KafkaActionConstant { public static final String ALTER = "alter"; public static final String DESCRIBE = "describe"; public static final String CLUSTER_ACTION = "clusteraction"; + public static final String ALTER_CONFIGS = "alterconfigs"; + public static final String DESCRIBE_CONFIGS = "describeconfigs"; + public static final String IDEMPOTENT_WRITE = "idempotentwrite"; public static final String actionName = "action"; } diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java index a1fec1f..1706057 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java @@ -53,8 +53,12 @@ public class KafkaActionFactory extends BitFieldActionFactory { ALTER(KafkaActionConstant.ALTER, 16), DESCRIBE(KafkaActionConstant.DESCRIBE, 32), CLUSTERACTION(KafkaActionConstant.CLUSTER_ACTION, 64), + ALTERCONFIGS(KafkaActionConstant.ALTER_CONFIGS, 128), + DESCRIBECONFIGS(KafkaActionConstant.DESCRIBE_CONFIGS, 256), + IDEMPOTENTWRITE(KafkaActionConstant.IDEMPOTENT_WRITE, 512), ALL(KafkaActionConstant.ALL, READ.getCode() | WRITE.getCode() | CREATE.getCode() - | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode()); + | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode() + | ALTERCONFIGS.getCode() | DESCRIBECONFIGS.getCode() | IDEMPOTENTWRITE.getCode()); private String name; private int code; diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java index 52ae614..6a0c6f7 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java @@ -39,6 +39,9 @@ import org.apache.sentry.core.common.Authorizable; * CONSUMERGROUP -> Kafka ConsumerGroup resource, users are required to have access to this resource * in order to perform ConsumerGroup level actions like joining a consumer group, * querying offset for a partition for a particular consumer group. + * + * TRANSACTIONALID -> This resource represents actions related to transactions, such as committing. + * */ public interface KafkaAuthorizable extends Authorizable { /** @@ -48,7 +51,8 @@ public interface KafkaAuthorizable extends Authorizable { CLUSTER, HOST, TOPIC, - CONSUMERGROUP + CONSUMERGROUP, + TRANSACTIONALID }; /** diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java index 45a1148..7a0ecf5 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java @@ -50,6 +50,8 @@ public class KafkaModelAuthorizables { return new Topic(name); case CONSUMERGROUP: return new ConsumerGroup(name); + case TRANSACTIONALID: + return new TransactionalId(name); default: return null; } diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java index e460874..cbf741b 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java @@ -46,6 +46,8 @@ public class KafkaPrivilegeModel implements Model { ImplyMethodType.STRING_CASE_SENSITIVE); implyMethodMap.put(KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name().toLowerCase(), ImplyMethodType.STRING_CASE_SENSITIVE); + implyMethodMap.put(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name().toLowerCase(), + ImplyMethodType.STRING_CASE_SENSITIVE); } @Override diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java new file mode 100644 index 0000000..2a91c49 --- /dev/null +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.model.kafka; + +/** + * Represents transactional ID authorizable in Kafka model. + */ +public class TransactionalId implements KafkaAuthorizable { + private String name; + + /** + * Create a transactional ID authorizable for Kafka cluster of a given name. + * + * @param name Name of Kafka transactional ID. + */ + public TransactionalId(String name) { + this.name = name; + } + + /** + * Get type of Kafka's transactional ID authorizable. + * + * @return Type of Kafka's transactional ID authorizable. + */ + @Override + public AuthorizableType getAuthzType() { return AuthorizableType.TRANSACTIONALID; } + + /** + * Get name of Kafka's transactional ID. + * + * @return Name of Kafka's transactional ID. + */ + @Override + public String getName() { + return name; + } + + /** + * Get type name of Kafka's transactional ID authorizable. + * + * @return Type name of Kafka's transactional ID authorizable. + */ + @Override + public String getTypeName() { + return getAuthzType().name(); + } +} \ No newline at end of file diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java index dcab5d5..f450d21 100644 --- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java +++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java @@ -30,101 +30,84 @@ public class TestKafkaAction { private KafkaActionFactory factory = KafkaActionFactory.getInstance(); @Test - public void testImpliesAction() { - KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ); - KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE); - KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE); - KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE); - KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER); - KafkaAction describeAction = - (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE); - KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION); - KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL); - - assertTrue(allAction.implies(readAction)); - assertTrue(allAction.implies(writeAction)); - assertTrue(allAction.implies(createAction)); - assertTrue(allAction.implies(deleteAction)); - assertTrue(allAction.implies(alterAction)); - assertTrue(allAction.implies(describeAction)); - assertTrue(allAction.implies(adminAction)); - assertTrue(allAction.implies(allAction)); - - assertTrue(readAction.implies(readAction)); - assertFalse(readAction.implies(writeAction)); - assertFalse(readAction.implies(createAction)); - assertFalse(readAction.implies(deleteAction)); - assertFalse(readAction.implies(alterAction)); - assertFalse(readAction.implies(describeAction)); - assertFalse(readAction.implies(adminAction)); - assertFalse(readAction.implies(allAction)); - - assertFalse(writeAction.implies(readAction)); - assertTrue(writeAction.implies(writeAction)); - assertFalse(writeAction.implies(createAction)); - assertFalse(writeAction.implies(deleteAction)); - assertFalse(writeAction.implies(alterAction)); - assertFalse(writeAction.implies(describeAction)); - assertFalse(writeAction.implies(adminAction)); - assertFalse(writeAction.implies(allAction)); - - assertFalse(createAction.implies(readAction)); - assertFalse(createAction.implies(writeAction)); - assertTrue(createAction.implies(createAction)); - assertFalse(createAction.implies(deleteAction)); - assertFalse(createAction.implies(alterAction)); - assertFalse(createAction.implies(describeAction)); - assertFalse(createAction.implies(adminAction)); - assertFalse(createAction.implies(allAction)); - - assertFalse(deleteAction.implies(readAction)); - assertFalse(deleteAction.implies(writeAction)); - assertFalse(deleteAction.implies(createAction)); - assertTrue(deleteAction.implies(deleteAction)); - assertFalse(deleteAction.implies(alterAction)); - assertFalse(deleteAction.implies(describeAction)); - assertFalse(deleteAction.implies(adminAction)); - assertFalse(deleteAction.implies(allAction)); - - assertFalse(alterAction.implies(readAction)); - assertFalse(alterAction.implies(writeAction)); - assertFalse(alterAction.implies(createAction)); - assertFalse(alterAction.implies(deleteAction)); - assertTrue(alterAction.implies(alterAction)); - assertFalse(alterAction.implies(describeAction)); - assertFalse(alterAction.implies(adminAction)); - assertFalse(alterAction.implies(allAction)); - - assertFalse(describeAction.implies(readAction)); - assertFalse(describeAction.implies(writeAction)); - assertFalse(describeAction.implies(createAction)); - assertFalse(describeAction.implies(deleteAction)); - assertFalse(describeAction.implies(alterAction)); - assertTrue(describeAction.implies(describeAction)); - assertFalse(describeAction.implies(adminAction)); - assertFalse(describeAction.implies(allAction)); - - assertFalse(adminAction.implies(readAction)); - assertFalse(adminAction.implies(writeAction)); - assertFalse(adminAction.implies(createAction)); - assertFalse(adminAction.implies(deleteAction)); - assertFalse(adminAction.implies(alterAction)); - assertFalse(adminAction.implies(describeAction)); - assertTrue(adminAction.implies(adminAction)); - assertFalse(adminAction.implies(allAction)); + public void testAllActionImpliesAll() { + KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL); + + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.READ))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.WRITE))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CREATE))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DELETE))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE))); + assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALL))); + } + + @Test + public void testActionImpliesSelf() { + KafkaAction[] actions = new KafkaAction[]{ + factory.getActionByName(KafkaActionConstant.READ), + factory.getActionByName(KafkaActionConstant.WRITE), + factory.getActionByName(KafkaActionConstant.CREATE), + factory.getActionByName(KafkaActionConstant.DELETE), + factory.getActionByName(KafkaActionConstant.ALTER), + factory.getActionByName(KafkaActionConstant.DESCRIBE), + factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION), + factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS), + factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS), + factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE), + factory.getActionByName(KafkaActionConstant.ALL) + }; + + for(KafkaAction action : actions){ + assertTrue(action.implies(action)); + } + } + + @Test + public void testNonAllActionDoesNotImplyOthers() { + KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL); + + KafkaAction[] actions = new KafkaAction[]{ + factory.getActionByName(KafkaActionConstant.READ), + factory.getActionByName(KafkaActionConstant.WRITE), + factory.getActionByName(KafkaActionConstant.CREATE), + factory.getActionByName(KafkaActionConstant.DELETE), + factory.getActionByName(KafkaActionConstant.ALTER), + factory.getActionByName(KafkaActionConstant.DESCRIBE), + factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION), + factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS), + factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS), + factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE) + }; + + for(KafkaAction action : actions) { + for(KafkaAction action2 : actions) { + if (action != action2) { + assertFalse(action.implies(action2)); + } + } + + assertFalse(action.implies(allAction)); + } } @Test public void testGetActionByName() throws Exception { - KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ); - KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE); - KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE); - KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE); - KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER); - KafkaAction describeAction = - (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE); - KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION); - KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL); + KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ); + KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE); + KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE); + KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE); + KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER); + KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE); + KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION); + KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS); + KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS); + KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE); + KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL); assertTrue(readAction.equals(new KafkaAction(KafkaActionConstant.READ))); assertTrue(writeAction.equals(new KafkaAction(KafkaActionConstant.WRITE))); @@ -133,20 +116,25 @@ public class TestKafkaAction { assertTrue(alterAction.equals(new KafkaAction(KafkaActionConstant.ALTER))); assertTrue(describeAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE))); assertTrue(adminAction.equals(new KafkaAction(KafkaActionConstant.CLUSTER_ACTION))); + assertTrue(alterConfigsAction.equals(new KafkaAction(KafkaActionConstant.ALTER_CONFIGS))); + assertTrue(describeConfigsAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS))); + assertTrue(idempotentWriteAction.equals(new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE))); assertTrue(allAction.equals(new KafkaAction(KafkaActionConstant.ALL))); } @Test public void testGetActionsByCode() throws Exception { - KafkaAction readAction = new KafkaAction(KafkaActionConstant.READ); - KafkaAction writeAction = new KafkaAction(KafkaActionConstant.WRITE); - KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE); - KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE); - KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER); - KafkaAction describeAction = - (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE); - KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION); - KafkaAction allAction = new KafkaAction(KafkaActionConstant.ALL); + KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ); + KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE); + KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE); + KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE); + KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER); + KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE); + KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION); + KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS); + KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS); + KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE); + KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL); assertEquals(Lists.newArrayList(readAction), factory.getActionsByCode(readAction.getActionCode())); @@ -162,8 +150,15 @@ public class TestKafkaAction { factory.getActionsByCode(describeAction.getActionCode())); assertEquals(Lists.newArrayList(adminAction), factory.getActionsByCode(adminAction.getActionCode())); + assertEquals(Lists.newArrayList(alterConfigsAction), + factory.getActionsByCode(alterConfigsAction.getActionCode())); + assertEquals(Lists.newArrayList(describeConfigsAction), + factory.getActionsByCode(describeConfigsAction.getActionCode())); + assertEquals(Lists.newArrayList(idempotentWriteAction), + factory.getActionsByCode(idempotentWriteAction.getActionCode())); assertEquals(Lists.newArrayList(readAction, writeAction, createAction, deleteAction, - alterAction, describeAction, adminAction), factory.getActionsByCode(allAction + alterAction, describeAction, adminAction, + alterConfigsAction, describeConfigsAction, idempotentWriteAction), factory.getActionsByCode(allAction .getActionCode())); } diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java index 04316f2..e00fbbd 100644 --- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java +++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java @@ -41,6 +41,9 @@ public class TestKafkaAuthorizable { ConsumerGroup consumerGroup = new ConsumerGroup(name); Assert.assertEquals(consumerGroup.getName(), name); + + TransactionalId transactionalId = new TransactionalId(name); + Assert.assertEquals(transactionalId.getName(), name); } @Test @@ -56,5 +59,8 @@ public class TestKafkaAuthorizable { ConsumerGroup consumerGroup = new ConsumerGroup("consumerGroup1"); Assert.assertEquals(consumerGroup.getAuthzType(), AuthorizableType.CONSUMERGROUP); + + TransactionalId transactionalId = new TransactionalId("transactionalId1"); + Assert.assertEquals(transactionalId.getAuthzType(), AuthorizableType.TRANSACTIONALID); } }