Return-Path: X-Original-To: apmail-sentry-commits-archive@minotaur.apache.org Delivered-To: apmail-sentry-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D84A5189EC for ; Mon, 1 Feb 2016 01:30:17 +0000 (UTC) Received: (qmail 86219 invoked by uid 500); 1 Feb 2016 01:30:17 -0000 Delivered-To: apmail-sentry-commits-archive@sentry.apache.org Received: (qmail 86162 invoked by uid 500); 1 Feb 2016 01:30:17 -0000 Mailing-List: contact commits-help@sentry.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sentry.incubator.apache.org Delivered-To: mailing list commits@sentry.incubator.apache.org Received: (qmail 86153 invoked by uid 99); 1 Feb 2016 01:30:17 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 01:30:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6FAF3C07A5 for ; Mon, 1 Feb 2016 01:30:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.227 X-Spam-Level: * X-Spam-Status: No, score=1.227 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.553] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id WDF6RSb0mCLm for ; Mon, 1 Feb 2016 01:30:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 50B0C2148A for ; Mon, 1 Feb 2016 01:30:08 +0000 (UTC) Received: (qmail 86140 invoked by uid 99); 1 Feb 2016 01:30:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 01:30:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B787DFF85; Mon, 1 Feb 2016 01:30:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sdp@apache.org To: commits@sentry.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-sentry git commit: SENTRY-1013: Add policy engine for Kafka (Ashish K Singh via Dapeng Sun) Date: Mon, 1 Feb 2016 01:30:07 +0000 (UTC) Repository: incubator-sentry Updated Branches: refs/heads/kafka d114d2d5b -> 4c37d78e7 SENTRY-1013: Add policy engine for Kafka (Ashish K Singh via Dapeng Sun) Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/4c37d78e Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/4c37d78e Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/4c37d78e Branch: refs/heads/kafka Commit: 4c37d78e74884447493d6ab110c24462d41b1c4a Parents: d114d2d Author: Sun Dapeng Authored: Mon Feb 1 09:19:21 2016 +0800 Committer: Sun Dapeng Committed: Mon Feb 1 09:19:21 2016 +0800 ---------------------------------------------------------------------- pom.xml | 5 + sentry-policy/pom.xml | 1 + sentry-policy/sentry-policy-kafka/pom.xml | 80 +++++++ .../policy/kafka/KafkaModelAuthorizables.java | 57 +++++ .../policy/kafka/KafkaPrivilegeValidator.java | 68 ++++++ .../policy/kafka/KafkaWildcardPrivilege.java | 131 ++++++++++++ .../policy/kafka/SimpleKafkaPolicyEngine.java | 87 ++++++++ .../kafka/KafkaPolicyFileProviderBackend.java | 35 +++ .../kafka/MockGroupMappingServiceProvider.java | 39 ++++ .../kafka/TestKafkaModelAuthorizables.java | 54 +++++ .../kafka/TestKafkaPrivilegeValidator.java | 118 +++++++++++ .../kafka/TestKafkaWildcardPrivilege.java | 179 ++++++++++++++++ .../engine/AbstractTestKafkaPolicyEngine.java | 163 ++++++++++++++ .../kafka/engine/TestKafkaPolicyEngineDFS.java | 76 +++++++ .../engine/TestKafkaPolicyEngineLocalFS.java | 47 ++++ ...tKafkaAuthorizationProviderGeneralCases.java | 212 +++++++++++++++++++ ...tKafkaAuthorizationProviderSpecialCases.java | 88 ++++++++ .../kafka/provider/TestKafkaPolicyNegative.java | 105 +++++++++ .../src/test/resources/log4j.properties | 31 +++ .../src/test/resources/test-authz-provider.ini | 38 ++++ 20 files changed, 1614 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index aa4a021..aa99e31 100644 --- a/pom.xml +++ b/pom.xml @@ -487,6 +487,11 @@ limitations under the License. org.apache.sentry + sentry-policy-kafka + ${project.version} + + + org.apache.sentry sentry-dist ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-policy/pom.xml b/sentry-policy/pom.xml index ef938a6..45dc675 100644 --- a/sentry-policy/pom.xml +++ b/sentry-policy/pom.xml @@ -35,6 +35,7 @@ limitations under the License. sentry-policy-indexer sentry-policy-search sentry-policy-sqoop + sentry-policy-kafka http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/pom.xml b/sentry-policy/sentry-policy-kafka/pom.xml new file mode 100644 index 0000000..21d34eb --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/pom.xml @@ -0,0 +1,80 @@ + + + + 4.0.0 + + org.apache.sentry + sentry-policy + 1.7.0-incubating-SNAPSHOT + + + sentry-policy-kafka + Sentry Policy for Kafka + + + + junit + junit + test + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-minicluster + test + + + log4j + log4j + + + org.apache.shiro + shiro-core + + + com.google.guava + guava + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + org.apache.sentry + sentry-core-model-kafka + + + org.apache.sentry + sentry-provider-common + + + org.apache.sentry + sentry-provider-file + + + + http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java new file mode 100644 index 0000000..ba93036 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.policy.kafka; + +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.ConsumerGroup; +import org.apache.sentry.core.model.kafka.KafkaAuthorizable; +import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.provider.common.KeyValue; + +public class KafkaModelAuthorizables { + public static KafkaAuthorizable from(KeyValue keyValue) { + String prefix = keyValue.getKey().toLowerCase(); + String name = keyValue.getValue().toLowerCase(); + for (AuthorizableType type : AuthorizableType.values()) { + if (prefix.equalsIgnoreCase(type.name())) { + return from(type, name); + } + } + return null; + } + + public static KafkaAuthorizable from(String keyValue) { + return from(new KeyValue(keyValue)); + } + + public static KafkaAuthorizable from(AuthorizableType type, String name) { + switch (type) { + case HOST: + return new Host(name); + case CLUSTER: + return new Cluster(name); + case TOPIC: + return new Topic(name); + case CONSUMERGROUP: + return new ConsumerGroup(name); + default: + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java new file mode 100644 index 0000000..ecad355 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.policy.kafka; + +import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_SPLITTER; +import static org.apache.sentry.provider.common.ProviderConstants.PRIVILEGE_PREFIX; + +import java.util.List; + +import org.apache.sentry.core.model.kafka.KafkaAuthorizable; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.policy.common.PrivilegeValidator; +import org.apache.sentry.policy.common.PrivilegeValidatorContext; +import org.apache.shiro.config.ConfigurationException; + +import com.google.common.collect.Lists; + +public class KafkaPrivilegeValidator implements PrivilegeValidator { + + public KafkaPrivilegeValidator() { + } + + @Override + public void validate(PrivilegeValidatorContext context) + throws ConfigurationException { + Iterable authorizables = parsePrivilege(context.getPrivilege()); + boolean hostnameMatched = false; + for (KafkaAuthorizable authorizable : authorizables) { + if (authorizable instanceof Host) { + hostnameMatched = true; + break; + } + } + if (!hostnameMatched) { + String msg = "host=[name] in " + context.getPrivilege() + " is required."; + throw new ConfigurationException(msg); + } + } + + private Iterable parsePrivilege(String string) { + List result = Lists.newArrayList(); + for(String section : AUTHORIZABLE_SPLITTER.split(string)) { + if(!section.toLowerCase().startsWith(PRIVILEGE_PREFIX)) { + KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(section); + if(authorizable == null) { + String msg = "No authorizable found for " + section; + throw new ConfigurationException(msg); + } + result.add(authorizable); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java new file mode 100644 index 0000000..e04aeb7 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.policy.kafka; + +import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_SPLITTER; + +import java.util.List; + +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.policy.common.Privilege; +import org.apache.sentry.policy.common.PrivilegeFactory; +import org.apache.sentry.provider.common.KeyValue; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class KafkaWildcardPrivilege implements Privilege { + + public static class Factory implements PrivilegeFactory { + @Override + public Privilege createPrivilege(String permission) { + return new KafkaWildcardPrivilege(permission); + } + } + + private final ImmutableList parts; + + public KafkaWildcardPrivilege(String permission) { + if (Strings.isNullOrEmpty(permission)) { + throw new IllegalArgumentException("Permission string cannot be null or empty."); + } + Listparts = Lists.newArrayList(); + for (String authorizable : AUTHORIZABLE_SPLITTER.trimResults().split(permission.trim())) { + if (authorizable.isEmpty()) { + throw new IllegalArgumentException("Privilege '" + permission + "' has an empty section"); + } + parts.add(new KeyValue(authorizable)); + } + if (parts.isEmpty()) { + throw new AssertionError("Privilege, " + permission + ", did not consist of any valid authorizable."); + } + this.parts = ImmutableList.copyOf(parts); + } + + @Override + public boolean implies(Privilege p) { + if (!(p instanceof KafkaWildcardPrivilege)) { + return false; + } + KafkaWildcardPrivilege wp = (KafkaWildcardPrivilege)p; + List otherParts = wp.parts; + if(equals(wp)) { + return true; + } + int index = 0; + for (KeyValue otherPart : otherParts) { + // If this privilege has less parts than the other privilege, everything + // after the number of parts contained + // in this privilege is automatically implied, so return true + if (parts.size() - 1 < index) { + return true; + } else { + KeyValue part = parts.get(index); + // Support for action inheritance from parent to child + if (part.getKey().equalsIgnoreCase(KafkaActionConstant.actionName) + && !(otherPart.getKey().equalsIgnoreCase(KafkaActionConstant.actionName))) { + continue; + } + // are the keys even equal + if(!part.getKey().equalsIgnoreCase(otherPart.getKey())) { + return false; + } + if (!impliesKeyValue(part, otherPart)) { + return false; + } + index++; + } + } + // If this privilege has more parts than + // the other parts, only imply it if + // all of the other parts are "*" or "ALL" + for (; index < parts.size(); index++) { + KeyValue part = parts.get(index); + if (!part.getValue().equals(KafkaActionConstant.ALL)) { + return false; + } + } + return true; + } + + private boolean impliesKeyValue(KeyValue policyPart, KeyValue requestPart) { + Preconditions.checkState(policyPart.getKey().equalsIgnoreCase(requestPart.getKey()), + "Please report, this method should not be called with two different keys"); + if(policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) || + policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL_NAME) || + policyPart.equals(requestPart)) { + return true; + } else if (!KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey()) + && KafkaActionConstant.ALL.equalsIgnoreCase(requestPart.getValue())) { + /* privilege request is to match with any object of given type */ + return true; + } + return false; + + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for(KeyValue kv: this.parts) { + sb.append(kv.getKey() + "=" + kv.getValue() + "->"); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java new file mode 100644 index 0000000..7e043e1 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.util.Set; + +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.SentryConfigurationException; +import org.apache.sentry.policy.common.PolicyEngine; +import org.apache.sentry.policy.common.PrivilegeFactory; +import org.apache.sentry.policy.common.PrivilegeValidator; +import org.apache.sentry.provider.common.ProviderBackend; +import org.apache.sentry.provider.common.ProviderBackendContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class SimpleKafkaPolicyEngine implements PolicyEngine { + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaPolicyEngine.class); + private final ProviderBackend providerBackend; + + public SimpleKafkaPolicyEngine(ProviderBackend providerBackend) { + this.providerBackend = providerBackend; + ProviderBackendContext context = new ProviderBackendContext(); + context.setAllowPerDatabase(false); + context.setValidators(ImmutableList.of(new KafkaPrivilegeValidator())); + this.providerBackend.initialize(context); + } + + @Override + public PrivilegeFactory getPrivilegeFactory() { + return new KafkaWildcardPrivilege.Factory(); + } + + @Override + public ImmutableSet getAllPrivileges(Set groups, ActiveRoleSet roleSet) + throws SentryConfigurationException { + return getPrivileges(groups, roleSet); + } + + @Override + public ImmutableSet getPrivileges(Set groups, ActiveRoleSet roleSet, + Authorizable... authorizableHierarchy) + throws SentryConfigurationException { + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Getting permissions for {}", groups); + } + ImmutableSet result = providerBackend.getPrivileges(groups, roleSet); + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("result = " + result); + } + return result; + } + + @Override + public void close() { + if (providerBackend != null) { + providerBackend.close(); + } + } + + @Override + public void validatePolicy(boolean strictValidation) + throws SentryConfigurationException { + if (providerBackend != null) { + providerBackend.validatePolicy(strictValidation); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java new file mode 100644 index 0000000..47a053d --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine; +import org.apache.sentry.provider.file.SimpleFileProviderBackend; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaPolicyFileProviderBackend extends SimpleKafkaPolicyEngine { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPolicyFileProviderBackend.class); + public KafkaPolicyFileProviderBackend(String resource) throws IOException { + super(new SimpleFileProviderBackend(new Configuration(), resource)); + LOGGER.warn("The DB provider backend is the preferred option over file provider backend as the kafka policy engine"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java new file mode 100644 index 0000000..572c74d --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.util.Set; + +import org.apache.sentry.provider.common.GroupMappingService; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +public class MockGroupMappingServiceProvider implements GroupMappingService { + private final Multimap userToGroupMap; + + public MockGroupMappingServiceProvider(Multimap userToGroupMap) { + this.userToGroupMap = userToGroupMap; + } + @Override + public Set getGroups(String user) { + return Sets.newHashSet(userToGroupMap.get(user)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java new file mode 100644 index 0000000..46a0078 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; + +import org.apache.sentry.core.model.kafka.Host; +import org.junit.Test; + +public class TestKafkaModelAuthorizables { + + @Test + public void testHost() throws Exception { + Host host1 = (Host)KafkaModelAuthorizables.from("HOST=host1"); + assertEquals("host1", host1.getName()); + } + + @Test(expected=IllegalArgumentException.class) + public void testNoKV() throws Exception { + System.out.println(KafkaModelAuthorizables.from("nonsense")); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyKey() throws Exception { + System.out.println(KafkaModelAuthorizables.from("=host1")); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyValue() throws Exception { + System.out.println(KafkaModelAuthorizables.from("HOST=")); + } + + @Test + public void testNotAuthorizable() throws Exception { + assertNull(KafkaModelAuthorizables.from("k=v")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java new file mode 100644 index 0000000..ba670f7 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import junit.framework.Assert; + +import org.apache.sentry.policy.common.PrivilegeValidatorContext; +import org.apache.shiro.config.ConfigurationException; +import org.junit.Test; + +public class TestKafkaPrivilegeValidator { + @Test + public void testOnlyHostResource() { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1")); + } catch (ConfigurationException ex) { + Assert.fail("Unexpected ConfigurationException."); + } + } + + @Test + public void testWithoutHostResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("cluster=c1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("topic=t1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("consumergroup=g1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + + @Test + public void testValidPrivileges() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->cluster=c1->action=read")); + } catch (ConfigurationException ex) { + Assert.fail("Not expected ConfigurationException"); + } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->action=read")); + } catch (ConfigurationException ex) { + Assert.fail("Not expected ConfigurationException"); + } + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->consumergroup=g1->action=read")); + } catch (ConfigurationException ex) { + Assert.fail("Not expected ConfigurationException"); + } + } + + @Test + public void testInvalidHostResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("hhost=host1->cluster=c1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + + @Test + public void testInvalidClusterResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->clluster=c1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + + @Test + public void testInvalidTopicResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->ttopic=t1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + + @Test + public void testInvalidConsumerGroupResource() throws Exception { + KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator(); + try { + kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->coonsumergroup=g1->action=read")); + Assert.fail("Expected ConfigurationException"); + } catch (ConfigurationException ex) { + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java new file mode 100644 index 0000000..720c98f --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; +import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_JOINER; +import static org.apache.sentry.provider.common.ProviderConstants.KV_JOINER; +import static org.apache.sentry.provider.common.ProviderConstants.KV_SEPARATOR; + +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.policy.common.Privilege; +import org.apache.sentry.policy.kafka.KafkaWildcardPrivilege; +import org.apache.sentry.provider.common.KeyValue; +import org.junit.Test; + +public class TestKafkaWildcardPrivilege { + private static final Privilege KAFKA_HOST1_ALL = + create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.ALL)); + private static final Privilege KAFKA_HOST1_READ = + create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.READ)); + private static final Privilege KAFKA_HOST1_WRITE = + create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.WRITE)); + + private static final Privilege KAFKA_HOST1_TOPIC1_ALL = + create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.ALL)); + private static final Privilege KAFKA_HOST1_TOPIC1_READ = + create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.READ)); + private static final Privilege KAFKA_HOST1_TOPIC1_WRITE = + create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.WRITE)); + + private static final Privilege KAFKA_HOST1_CLUSTER1_ALL = + create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.ALL)); + private static final Privilege KAFKA_HOST1_CLUSTER1_READ = + create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.READ)); + private static final Privilege KAFKA_HOST1_CLUSTER1_WRITE = + create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.WRITE)); + + private static final Privilege KAFKA_HOST1_GROUP1_ALL = + create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.ALL)); + private static final Privilege KAFKA_HOST1_GROUP1_READ = + create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.READ)); + private static final Privilege KAFKA_HOST1_GROUP1_WRITE = + create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.WRITE)); + + + @Test + public void testSimpleAction() throws Exception { + //host + assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ)); + assertFalse(KAFKA_HOST1_READ.implies(KAFKA_HOST1_WRITE)); + //consumer group + assertFalse(KAFKA_HOST1_GROUP1_WRITE.implies(KAFKA_HOST1_GROUP1_READ)); + assertFalse(KAFKA_HOST1_GROUP1_READ.implies(KAFKA_HOST1_GROUP1_WRITE)); + //topic + assertFalse(KAFKA_HOST1_TOPIC1_READ.implies(KAFKA_HOST1_TOPIC1_WRITE)); + assertFalse(KAFKA_HOST1_TOPIC1_WRITE.implies(KAFKA_HOST1_TOPIC1_READ)); + //cluster + assertFalse(KAFKA_HOST1_CLUSTER1_READ.implies(KAFKA_HOST1_CLUSTER1_WRITE)); + assertFalse(KAFKA_HOST1_CLUSTER1_WRITE.implies(KAFKA_HOST1_CLUSTER1_READ)); + } + + @Test + public void testShorterThanRequest() throws Exception { + //topic + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_ALL)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_READ)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE)); + + assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ)); + assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_TOPIC1_READ)); + assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_TOPIC1_WRITE)); + + //cluster + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_ALL)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE)); + + assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_CLUSTER1_READ)); + assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_CLUSTER1_WRITE)); + + //consumer group + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_ALL)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_READ)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE)); + + assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_GROUP1_READ)); + assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_GROUP1_WRITE)); + } + + @Test + public void testActionAll() throws Exception { + //host + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_READ)); + assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_WRITE)); + + //topic + assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_READ)); + assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE)); + + //cluster + assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ)); + assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE)); + + //consumer group + assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_READ)); + assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE)); + } + + @Test + public void testUnexpected() throws Exception { + Privilege p = new Privilege() { + @Override + public boolean implies(Privilege p) { + return false; + } + }; + Privilege topic1 = create(new KeyValue("HOST", "host"), new KeyValue("TOPIC", "topic1")); + assertFalse(topic1.implies(null)); + assertFalse(topic1.implies(p)); + assertFalse(topic1.equals(null)); + assertFalse(topic1.equals(p)); + } + + @Test(expected=IllegalArgumentException.class) + public void testNullString() throws Exception { + System.out.println(create((String)null)); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyString() throws Exception { + System.out.println(create("")); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyKey() throws Exception { + System.out.println(create(KV_JOINER.join("", "host1"))); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyValue() throws Exception { + System.out.println(create(KV_JOINER.join("HOST", ""))); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyPart() throws Exception { + System.out.println(create(AUTHORIZABLE_JOINER. + join(KV_JOINER.join("HOST", "host1"), ""))); + } + + @Test(expected=IllegalArgumentException.class) + public void testOnlySeperators() throws Exception { + System.out.println(create(AUTHORIZABLE_JOINER. + join(KV_SEPARATOR, KV_SEPARATOR, KV_SEPARATOR))); + } + + static KafkaWildcardPrivilege create(KeyValue... keyValues) { + return create(AUTHORIZABLE_JOINER.join(keyValues)); + + } + static KafkaWildcardPrivilege create(String s) { + return new KafkaWildcardPrivilege(s); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java new file mode 100644 index 0000000..4da506b --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sentry.policy.kafka.engine; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.policy.common.PolicyEngine; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public abstract class AbstractTestKafkaPolicyEngine { + + private static final String ADMIN = "host=*"; + private static final String ADMIN_HOST1 = "host=host1"; + private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read"; + private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read"; + private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read"; + private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write"; + private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write"; + private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write"; + private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all"; + + private PolicyEngine policy; + private static File baseDir; + + @BeforeClass + public static void setupClazz() throws IOException { + baseDir = Files.createTempDir(); + } + + @AfterClass + public static void teardownClazz() throws IOException { + if (baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + protected void setPolicy(PolicyEngine policy) { + this.policy = policy; + } + + protected static File getBaseDir() { + return baseDir; + } + + @Before + public void setup() throws IOException { + afterSetup(); + } + + @After + public void teardown() throws IOException { + beforeTeardown(); + } + + protected void afterSetup() throws IOException {} + + protected void beforeTeardown() throws IOException {} + + + @Test + public void testConsumer0() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumer1() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumer2() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer0() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer1() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL)) + .toString()); + } + + + @Test + public void testProducer2() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumerProducer0() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testSubAdmin() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testAdmin() throws Exception { + Set expected = Sets.newTreeSet(Sets.newHashSet(ADMIN)); + Assert + .assertEquals(expected.toString(), + new TreeSet(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL)) + .toString()); + } + + private static Set set(String... values) { + return Sets.newHashSet(values); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java new file mode 100644 index 0000000..f2bd3c8 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka.engine; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend; +import org.apache.sentry.provider.file.PolicyFiles; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine { + private static MiniDFSCluster dfsCluster; + private static FileSystem fileSystem; + private static Path root; + private static Path etc; + + @BeforeClass + public static void setupLocalClazz() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + File dfsDir = new File(baseDir, "dfs"); + Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs()); + Configuration conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + fileSystem = dfsCluster.getFileSystem(); + root = new Path(fileSystem.getUri().toString()); + etc = new Path(root, "/etc"); + fileSystem.mkdirs(etc); + } + + @AfterClass + public static void teardownLocalClazz() { + if(dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + @Override + protected void afterSetup() throws IOException { + fileSystem.delete(etc, true); + fileSystem.mkdirs(etc); + PolicyFiles.copyToDir(fileSystem, etc, "test-authz-provider.ini"); + setPolicy(new KafkaPolicyFileProviderBackend(new Path(etc, + "test-authz-provider.ini").toString())); + } + + @Override + protected void beforeTeardown() throws IOException { + fileSystem.delete(etc, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java new file mode 100644 index 0000000..4bc061d --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka.engine; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend; +import org.apache.sentry.provider.file.PolicyFiles; + +public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine { + + @Override + protected void afterSetup() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini"); + setPolicy(new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath())); + } + + @Override + protected void beforeTeardown() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + FileUtils.deleteQuietly(baseDir); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java new file mode 100644 index 0000000..bcc1198 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka.provider; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.Action; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Subject; +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.ConsumerGroup; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend; +import org.apache.sentry.policy.kafka.MockGroupMappingServiceProvider; +import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.common.ResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFiles; +import org.junit.After; +import org.junit.Test; + +import com.google.common.base.Objects; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public class TestKafkaAuthorizationProviderGeneralCases { + private static final Multimap USER_TO_GROUP_MAP = HashMultimap.create(); + + private static final Host HOST_1 = new Host("host1"); + private static final Host HOST_2 = new Host("host2"); + private static final Cluster cluster1 = new Cluster("kafka-cluster"); + private static final Topic topic1 = new Topic("t1"); + private static final Topic topic2 = new Topic("t2"); + private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1"); + private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2"); + + private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL); + private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ); + private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE); + private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE); + private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE); + private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER); + private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE); + private static final KafkaAction CLUSTER_ACTION = new KafkaAction( + KafkaActionConstant.CLUSTER_ACTION); + + private static final Set allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION); + + private static final Subject ADMIN = new Subject("admin1"); + private static final Subject SUB_ADMIN = new Subject("subadmin1"); + private static final Subject CONSUMER0 = new Subject("consumer0"); + private static final Subject CONSUMER1 = new Subject("consumer1"); + private static final Subject CONSUMER2 = new Subject("consumer2"); + private static final Subject PRODUCER0 = new Subject("producer0"); + private static final Subject PRODUCER1 = new Subject("producer1"); + private static final Subject PRODUCER2 = new Subject("producer2"); + private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0"); + + private static final String ADMIN_GROUP = "admin_group"; + private static final String SUBADMIN_GROUP = "subadmin_group"; + private static final String CONSUMER_GROUP0 = "consumer_group0"; + private static final String CONSUMER_GROUP1 = "consumer_group1"; + private static final String CONSUMER_GROUP2 = "consumer_group2"; + private static final String PRODUCER_GROUP0 = "producer_group0"; + private static final String PRODUCER_GROUP1 = "producer_group1"; + private static final String PRODUCER_GROUP2 = "producer_group2"; + private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0"; + + static { + USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP)); + USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(), Arrays.asList(SUBADMIN_GROUP )); + USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(), Arrays.asList(CONSUMER_GROUP0)); + USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(), Arrays.asList(CONSUMER_GROUP1)); + USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(), Arrays.asList(CONSUMER_GROUP2)); + USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0)); + USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1)); + USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2)); + USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0)); + } + + private final ResourceAuthorizationProvider authzProvider; + private File baseDir; + + public TestKafkaAuthorizationProviderGeneralCases() throws IOException { + baseDir = Files.createTempDir(); + PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini"); + authzProvider = new HadoopGroupResourceAuthorizationProvider( + new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath()), + new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP)); + } + + @After + public void teardown() { + if(baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + private void doTestResourceAuthorizationProvider(Subject subject, List authorizableHierarchy, + Set actions, boolean expected) throws Exception { + Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters"); + helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions); + Assert.assertEquals(helper.toString(), expected, + authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL)); + } + + @Test + public void testAdmin() throws Exception { + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true); + + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false); + + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true); + + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true); + } + + @Test + public void testConsumer() throws Exception { + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1), + Sets.newHashSet(action), READ.equals(action)); + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1), + Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action)); + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2), + Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action)); + } + } + + @Test + public void testProducer() throws Exception { + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1), + Sets.newHashSet(action), WRITE.equals(action)); + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1), + Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action)); + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) + doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2), + Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action)); + } + } + + @Test + public void testConsumerProducer() throws Exception { + for (KafkaAction action : allActions) { + doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1), + Sets.newHashSet(action), true); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java new file mode 100644 index 0000000..0a453ce --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka.provider; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.Action; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Subject; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend; +import org.apache.sentry.provider.common.AuthorizationProvider; +import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFile; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public class TestKafkaAuthorizationProviderSpecialCases { + private AuthorizationProvider authzProvider; + private PolicyFile policyFile; + private File baseDir; + private File iniFile; + private String initResource; + @Before + public void setup() throws IOException { + baseDir = Files.createTempDir(); + iniFile = new File(baseDir, "policy.ini"); + initResource = "file://" + iniFile.getPath(); + policyFile = new PolicyFile(); + } + + @After + public void teardown() throws IOException { + if(baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + @Test + public void testDuplicateEntries() throws Exception { + Subject user1 = new Subject("user1"); + Host host1 = new Host("host1"); + Topic topic1 = new Topic("t1"); + Set actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ)); + policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1") + .addRolesToGroup("group1", true, "role1", "role1") + .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read", + "host=host1->topic=t1->action=read"); + policyFile.write(iniFile); + KafkaPolicyFileProviderBackend policy = new KafkaPolicyFileProviderBackend(initResource); + authzProvider = new LocalGroupResourceAuthorizationProvider(initResource, policy); + List authorizableHierarchy = ImmutableList.of(host1, topic1); + Assert.assertTrue(authorizableHierarchy.toString(), + authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java new file mode 100644 index 0000000..0186cc9 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.policy.kafka.provider; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.policy.common.PolicyEngine; +import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public class TestKafkaPolicyNegative { + private File baseDir; + private File globalPolicyFile; + + @Before + public void setup() { + baseDir = Files.createTempDir(); + globalPolicyFile = new File(baseDir, "global.ini"); + } + + @After + public void teardown() { + if(baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + private void append(String from, File to) throws IOException { + Files.append(from + "\n", to, Charsets.UTF_8); + } + + @Test + public void testauthorizedKafkaInPolicyFile() throws Exception { + append("[groups]", globalPolicyFile); + append("other_group = other_role", globalPolicyFile); + append("[roles]", globalPolicyFile); + append("other_role = host=host1->topic=t1->action=read, host=host1->consumergroup=l1->action=read", globalPolicyFile); + PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath()); + //malicious_group has no privilege + ImmutableSet permissions = policy.getAllPrivileges(Sets.newHashSet("malicious_group"), ActiveRoleSet.ALL); + Assert.assertTrue(permissions.toString(), permissions.isEmpty()); + //other_group has two privileges + permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL); + Assert.assertTrue(permissions.toString(), permissions.size() == 2); + } + + @Test + public void testNoHostNameConfig() throws Exception { + append("[groups]", globalPolicyFile); + append("other_group = malicious_role", globalPolicyFile); + append("[roles]", globalPolicyFile); + append("malicious_role = topic=t1->action=read", globalPolicyFile); + PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath()); + ImmutableSet permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL); + Assert.assertTrue(permissions.toString(), permissions.isEmpty()); + } + + @Test + public void testHostAllName() throws Exception { + append("[groups]", globalPolicyFile); + append("group = malicious_role", globalPolicyFile); + append("[roles]", globalPolicyFile); + append("malicious_role = host=*", globalPolicyFile); + PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath()); + ImmutableSet permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL); + Assert.assertTrue(permissions.toString(), permissions.size() == 1); + } + + @Test + public void testAll() throws Exception { + append("[groups]", globalPolicyFile); + append("group = malicious_role", globalPolicyFile); + append("[roles]", globalPolicyFile); + append("malicious_role = *", globalPolicyFile); + PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath()); + ImmutableSet permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL); + Assert.assertTrue(permissions.toString(), permissions.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties b/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..7703069 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Define some default values that can be overridden by system properties. +# +# For testing, it may also be convenient to specify + +log4j.rootLogger=DEBUG,console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n + +log4j.logger.org.apache.hadoop.conf.Configuration=INFO \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/4c37d78e/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini new file mode 100644 index 0000000..c533e69 --- /dev/null +++ b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[groups] +admin_group = admin_all +subadmin_group = admin_host1 +consumer_group0 = consumer_t1_all +consumer_group1 = consumer_t1_host1 +consumer_group2 = consumer_t2_host2 +producer_group0 = producer_t1_all +producer_group1 = producer_t1_host1 +producer_group2 = producer_t2_host2 +consumer_producer_group0 = consumer_producer_t1 + +[roles] +admin_all = host=* +admin_host1 = host=host1 +consumer_t1_all = host=*->topic=t1->action=read +consumer_t1_host1 = host=host1->topic=t1->action=read +consumer_t2_host2 = host=host2->topic=t2->action=read +producer_t1_all = host=*->topic=t1->action=write +producer_t1_host1 = host=host1->topic=t1->action=write +producer_t2_host2 = host=host2->topic=t2->action=write +consumer_producer_t1 = host=host1->topic=t1->action=all