sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject [03/11] incubator-sentry git commit: SENTRY-1011: Add Kafka binding (Ashish K Singh, reviewed by HaoHao and Dapeng Sun, via Anne Yu)
Date Tue, 22 Mar 2016 06:24:00 GMT
SENTRY-1011: Add Kafka binding (Ashish K Singh, reviewed by HaoHao and Dapeng Sun, via Anne Yu)

Change-Id: I4e54d5d519448bac24896b2c76fd875978ec655a


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/7ce03735
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/7ce03735
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/7ce03735

Branch: refs/heads/master
Commit: 7ce037351b7060d9c46b5578669839caf62cadcd
Parents: 2575add
Author: Anne Yu <anneyu@cloudera.com>
Authored: Tue Feb 9 16:59:58 2016 -0800
Committer: hahao <hao.hao@cloudera.com>
Committed: Mon Mar 21 23:13:09 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  12 ++
 sentry-binding/pom.xml                          |   1 +
 sentry-binding/sentry-binding-kafka/pom.xml     |  76 ++++++++++
 .../org/apache/sentry/kafka/ConvertUtil.java    |  55 +++++++
 .../kafka/authorizer/SentryKafkaAuthorizer.java | 137 +++++++++++++++++
 .../sentry/kafka/binding/KafkaAuthBinding.java  | 152 +++++++++++++++++++
 .../binding/KafkaAuthBindingSingleton.java      |  87 +++++++++++
 .../apache/sentry/kafka/conf/KafkaAuthConf.java |  78 ++++++++++
 .../kafka/MockGroupMappingServiceProvider.java  |  46 ++++++
 .../kafka/authorizer/ConvertUtilTest.java       |  85 +++++++++++
 .../authorizer/SentryKafkaAuthorizerTest.java   | 126 +++++++++++++++
 .../src/test/resources/core-site.xml            |  26 ++++
 .../src/test/resources/log4j.properties         |  30 ++++
 .../src/test/resources/sentry-site.xml          |  42 +++++
 .../src/test/resources/test-authz-provider.ini  |  38 +++++
 .../provider/common/AuthorizationComponent.java |   1 +
 16 files changed, 992 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ac2d596..eb6d004 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,8 @@ limitations under the License.
     <sqoop.version>1.99.6</sqoop.version>
     <test.sentry.hadoop.classpath>${maven.test.classpath}</test.sentry.hadoop.classpath>
     <zookeeper.version>3.4.5</zookeeper.version>
+    <kafka.version>0.9.0.0</kafka.version>
+    <commons-io.version>1.3.2</commons-io.version>
   </properties>
 
   <dependencyManagement>
@@ -415,6 +417,11 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.sentry</groupId>
+        <artifactId>sentry-binding-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sentry</groupId>
         <artifactId>sentry-provider-common</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -605,6 +612,11 @@ limitations under the License.
         <artifactId>hamcrest-all</artifactId>
         <version>${hamcrest.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_2.11</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-binding/pom.xml b/sentry-binding/pom.xml
index 0f2a987..9e4999b 100644
--- a/sentry-binding/pom.xml
+++ b/sentry-binding/pom.xml
@@ -31,6 +31,7 @@ limitations under the License.
 
   <modules>
     <module>sentry-binding-hive</module>
+    <module>sentry-binding-kafka</module>
     <module>sentry-binding-solr</module>
     <module>sentry-binding-sqoop</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/pom.xml b/sentry-binding/sentry-binding-kafka/pom.xml
new file mode 100644
index 0000000..bd24c20
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sentry</groupId>
+    <artifactId>sentry-binding</artifactId>
+    <version>1.7.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>sentry-binding-kafka</artifactId>
+  <name>Sentry Binding for Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-model-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-policy-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-file</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-db</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-policy-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
new file mode 100644
index 0000000..c878308
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka;
+
+import java.util.List;
+
+import kafka.security.auth.Resource;
+
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.model.kafka.Host;
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+
+public class ConvertUtil {
+
+  public static List<Authorizable> convertResourceToAuthorizable(String hostname,
+      final Resource resource) {
+    List<Authorizable> authorizables = Lists.newArrayList();
+    authorizables.add(new Host(hostname));
+    authorizables.add(new Authorizable() {
+      @Override
+      public String getTypeName() {
+        final String resourceTypeName = resource.resourceType().name();
+        // Kafka's GROUP resource is referred as CONSUMERGROUP within Sentry.
+        if (resourceTypeName.equalsIgnoreCase("group")) {
+          return KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name();
+        } else {
+          return resourceTypeName;
+        }
+      }
+
+      @Override
+      public String getName() {
+        return resource.name();
+      }
+    });
+    return authorizables;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
new file mode 100644
index 0000000..9ffb971
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka.authorizer;
+
+import kafka.network.RequestChannel;
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.sentry.kafka.binding.KafkaAuthBinding;
+import org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton;
+import org.apache.sentry.kafka.conf.KafkaAuthConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.immutable.Map;
+import scala.collection.immutable.Set;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class SentryKafkaAuthorizer implements Authorizer {
+
+  private static Logger LOG =
+      LoggerFactory.getLogger(SentryKafkaAuthorizer.class);
+
+  KafkaAuthBinding binding;
+  KafkaAuthConf kafkaAuthConf;
+
+  String sentry_site = null;
+  List<KafkaPrincipal> super_users = null;
+
+  public SentryKafkaAuthorizer() {
+  }
+
+  @Override
+  public boolean authorize(RequestChannel.Session session, Operation operation,
+                           Resource resource) {
+    LOG.debug("Authorizing Session: " + session + " for Operation: " + operation + " on Resource: " + resource);
+    final KafkaPrincipal user = session.principal();
+    if (isSuperUser(user)) {
+      LOG.debug("Allowing SuperUser: " + user + " in " + session + " for Operation: " + operation + " on Resource: " + resource);
+      return true;
+    }
+    LOG.debug("User: " + user + " is not a SuperUser");
+    return binding.authorize(session, operation, resource);
+  }
+
+  @Override
+  public void addAcls(Set<Acl> acls, final Resource resource) {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public boolean removeAcls(Set<Acl> acls, final Resource resource) {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public boolean removeAcls(final Resource resource) {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public Set<Acl> getAcls(Resource resource) {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public Map<Resource, Set<Acl>> getAcls() {
+    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void configure(java.util.Map<String, ?> configs) {
+    final Object sentryKafkaSiteUrlConfig = configs.get(KafkaAuthConf.SENTRY_KAFKA_SITE_URL);
+    if (sentryKafkaSiteUrlConfig != null) {
+      this.sentry_site = sentryKafkaSiteUrlConfig.toString();
+    }
+    final Object kafkaSuperUsersConfig = configs.get(KafkaAuthConf.KAFKA_SUPER_USERS);
+    if (kafkaSuperUsersConfig != null) {
+      getSuperUsers(kafkaSuperUsersConfig.toString());
+    }
+    LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site);
+    final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance();
+    instance.configure(sentry_site);
+    this.binding = instance.getAuthBinding();
+    this.kafkaAuthConf = instance.getKafkaAuthConf();
+  }
+
+  private void getSuperUsers(String kafkaSuperUsers) {
+    super_users = new ArrayList<>();
+    String[] superUsers = kafkaSuperUsers.split(";");
+    for (String superUser : superUsers) {
+      if (!superUser.isEmpty()) {
+        final String trimmedUser = superUser.trim();
+        super_users.add(KafkaPrincipal.fromString(trimmedUser));
+        LOG.debug("Adding " + trimmedUser + " to list of Kafka SuperUsers.");
+      }
+    }
+  }
+
+  private boolean isSuperUser(KafkaPrincipal user) {
+    if (super_users != null) {
+      for (KafkaPrincipal superUser : super_users) {
+        if (superUser.equals(user)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
new file mode 100644
index 0000000..ccbe60e
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka.binding;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Sets;
+import kafka.network.RequestChannel;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.kafka.ConvertUtil;
+import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.provider.common.AuthorizationComponent;
+import org.apache.sentry.provider.common.AuthorizationProvider;
+import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaAuthBinding {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
+    private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
+
+    private final Configuration authConf;
+    private final AuthorizationProvider authProvider;
+    private ProviderBackend providerBackend;
+
+    private final KafkaActionFactory actionFactory = new KafkaActionFactory();
+
+    public KafkaAuthBinding(Configuration authConf) throws Exception {
+        this.authConf = authConf;
+        this.authProvider = createAuthProvider();
+    }
+
+    /**
+     * Instantiate the configured authz provider
+     *
+     * @return {@link AuthorizationProvider}
+     */
+    private AuthorizationProvider createAuthProvider() throws Exception {
+        /**
+         * get the authProvider class, policyEngine class, providerBackend class and resources from the
+         * kafkaAuthConf config
+         */
+        String authProviderName =
+            authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(),
+                AuthzConfVars.AUTHZ_PROVIDER.getDefault());
+        String resourceName =
+            authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(),
+                AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault());
+        String providerBackendName =
+            authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
+                AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault());
+        String policyEngineName =
+            authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(),
+                AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault());
+        String instanceName = authConf.get(AuthzConfVars.AUTHZ_INSTANCE_NAME.getVar());
+        if (resourceName != null && resourceName.startsWith("classpath:")) {
+            String resourceFileName = resourceName.substring("classpath:".length());
+            resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath();
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using authorization provider " + authProviderName + " with resource "
+                + resourceName + ", policy engine " + policyEngineName + ", provider backend "
+                + providerBackendName);
+        }
+
+        // Instantiate the configured providerBackend
+        Constructor<?> providerBackendConstructor =
+            Class.forName(providerBackendName)
+                .getDeclaredConstructor(Configuration.class, String.class);
+        providerBackendConstructor.setAccessible(true);
+        providerBackend =
+            (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf,
+                resourceName});
+        if (providerBackend instanceof SentryGenericProviderBackend) {
+            ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE);
+            ((SentryGenericProviderBackend) providerBackend).setServiceName("kafka" + instanceName);
+        }
+
+        // Instantiate the configured policyEngine
+        Constructor<?> policyConstructor =
+            Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class);
+        policyConstructor.setAccessible(true);
+        PolicyEngine policyEngine =
+            (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend});
+
+        // Instantiate the configured authProvider
+        Constructor<?> constructor =
+            Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class,
+                PolicyEngine.class);
+        constructor.setAccessible(true);
+        return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName,
+            policyEngine});
+    }
+
+    /**
+     * Authorize access to a Kafka privilege
+     */
+    public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
+        List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource);
+        Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name()));
+        return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
+    }
+
+    /*
+    * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
+    * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
+    * */
+    private String getName(RequestChannel.Session session) {
+        final String principalName = session.principal().getName();
+        int start = principalName.indexOf("CN=");
+        if (start >= 0) {
+            String tmpName, name = "";
+                tmpName = principalName.substring(start + 3);
+                int end = tmpName.indexOf(",");
+                if (end > 0) {
+                    name = tmpName.substring(0, end);
+                } else {
+                    name = tmpName;
+                }
+            return name;
+        } else {
+            return principalName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
new file mode 100644
index 0000000..92e50e6
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka.binding;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.sentry.kafka.conf.KafkaAuthConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class KafkaAuthBindingSingleton {
+  private static Logger log = LoggerFactory.getLogger(KafkaAuthBindingSingleton.class);
+
+  // Lazy init holder class idiom to avoid DCL
+  private static class KafkaAuthBindingSingletonHolder {
+    static final KafkaAuthBindingSingleton instance = new KafkaAuthBindingSingleton();
+  }
+
+  private static KafkaAuthConf kafkaAuthConf = null;
+
+  private KafkaAuthBinding binding;
+
+  private KafkaAuthBindingSingleton() {
+  }
+
+  private KafkaAuthConf loadAuthzConf(String sentry_site) {
+    if (Strings.isNullOrEmpty(sentry_site)) {
+      throw new IllegalArgumentException("Configuration key " + KafkaAuthConf.SENTRY_KAFKA_SITE_URL
+          + " value '" + sentry_site + "' is invalid.");
+    }
+
+    KafkaAuthConf kafkaAuthConf = null;
+    try {
+      kafkaAuthConf = new KafkaAuthConf(new URL(sentry_site));
+    } catch (MalformedURLException e) {
+      throw new IllegalArgumentException("Configuration key " + KafkaAuthConf.SENTRY_KAFKA_SITE_URL
+          + " specifies a malformed URL '" + sentry_site + "'", e);
+    }
+    return kafkaAuthConf;
+  }
+
+  public void configure(String sentry_site) {
+    try {
+      kafkaAuthConf = loadAuthzConf(sentry_site);
+      binding = new KafkaAuthBinding(kafkaAuthConf);
+      log.info("KafkaAuthBinding created successfully");
+    } catch (Exception ex) {
+      log.error("Unable to create KafkaAuthBinding", ex);
+      throw new RuntimeException("Unable to create KafkaAuthBinding: " + ex.getMessage(), ex);
+    }
+  }
+
+  public static KafkaAuthBindingSingleton getInstance() {
+    return KafkaAuthBindingSingletonHolder.instance;
+  }
+
+  public KafkaAuthBinding getAuthBinding() {
+    if (binding == null) {
+      throw new RuntimeException("KafkaAuthBindingSingleton not configured yet.");
+    }
+    return binding;
+  }
+
+  public KafkaAuthConf getKafkaAuthConf() {
+    if (binding == null) {
+      throw new RuntimeException("KafkaAuthBindingSingleton not configured yet.");
+    }
+    return kafkaAuthConf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
new file mode 100644
index 0000000..e75ec7e
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.kafka.conf;
+
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine;
+import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+
+public class KafkaAuthConf extends Configuration {
+  /**
+   * Configuration key used in kafka.properties to point at sentry-site.xml
+   */
+  public static final String SENTRY_KAFKA_SITE_URL = "sentry.kafka.site.url";
+  public static final String AUTHZ_SITE_FILE = "sentry-site.xml";
+  public static final String KAFKA_SUPER_USERS = "kafka.superusers";
+
+  /**
+   * Config setting definitions
+   */
+  public static enum AuthzConfVars {
+    AUTHZ_PROVIDER("sentry.kafka.provider",
+        HadoopGroupResourceAuthorizationProvider.class.getName()),
+    AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""),
+    AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
+    AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
+    AUTHZ_INSTANCE_NAME("sentry.kafka.name", "");
+
+    private final String varName;
+    private final String defaultVal;
+
+    AuthzConfVars(String varName, String defaultVal) {
+      this.varName = varName;
+      this.defaultVal = defaultVal;
+    }
+
+    public String getVar() {
+      return varName;
+    }
+
+    public String getDefault() {
+      return defaultVal;
+    }
+
+    public static String getDefault(String varName) {
+      for (AuthzConfVars oneVar : AuthzConfVars.values()) {
+        if (oneVar.getVar().equalsIgnoreCase(varName)) {
+          return oneVar.getDefault();
+        }
+      }
+      return null;
+    }
+  }
+
+  public KafkaAuthConf(URL kafkaAuthzSiteURL) {
+    super(true);
+    addResource(kafkaAuthzSiteURL);
+  }
+
+  @Override
+  public String get(String varName) {
+    return get(varName, AuthzConfVars.getDefault(varName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java
new file mode 100644
index 0000000..48f0d3d
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.kafka;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+import com.google.common.collect.Lists;
+
+public class MockGroupMappingServiceProvider implements GroupMappingServiceProvider {
+
+  public MockGroupMappingServiceProvider() {
+  }
+
+  @Override
+  public List<String> getGroups(String user) throws IOException {
+    return Lists.newArrayList(user);
+  }
+
+  @Override
+  public void cacheGroupsRefresh() throws IOException {
+  }
+
+  @Override
+  public void cacheGroupsAdd(List<String> groups) throws IOException {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
new file mode 100644
index 0000000..e08d442
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka.authorizer;
+
+import junit.framework.Assert;
+import kafka.security.auth.Resource;
+import kafka.security.auth.Resource$;
+import kafka.security.auth.ResourceType$;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.kafka.ConvertUtil;
+import org.junit.Test;
+
+import java.util.List;
+
+public class ConvertUtilTest {
+
+  @Test
+  public void testCluster() {
+    String hostname = "localhost";
+    String clusterName = Resource$.MODULE$.ClusterResourceName();
+    Resource clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), clusterName);
+    List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, clusterResource);
+    for (Authorizable auth : authorizables) {
+      if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.CLUSTER.name())) {
+        Assert.assertEquals(auth.getName(), clusterName);
+      } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+        Assert.assertEquals(auth.getName(), hostname);
+      } else {
+        Assert.fail("Unexpected type found: " + auth.getTypeName());
+      }
+    }
+    Assert.assertEquals(authorizables.size(), 2);
+  }
+
+  @Test
+  public void testTopic() {
+    String hostname = "localhost";
+    String topicName = "t1";
+    Resource topicResource = new Resource(ResourceType$.MODULE$.fromString("topic"), topicName);
+    List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, topicResource);
+    for (Authorizable auth : authorizables) {
+      if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.TOPIC.name())) {
+        Assert.assertEquals(auth.getName(), topicName);
+      } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+        Assert.assertEquals(auth.getName(), hostname);
+      } else {
+        Assert.fail("Unexpected type found: " + auth.getTypeName());
+      }
+    }
+    Assert.assertEquals(authorizables.size(), 2);
+  }
+
+  @Test
+  public void testConsumerGroup() {
+    String hostname = "localhost";
+    String consumerGroup = "g1";
+    Resource consumerGroupResource = new Resource(ResourceType$.MODULE$.fromString("group"), consumerGroup);
+    List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, consumerGroupResource);
+    for (Authorizable auth : authorizables) {
+      if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name())) {
+        Assert.assertEquals(auth.getName(),consumerGroup);
+      } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+        Assert.assertEquals(auth.getName(),hostname);
+      } else {
+        Assert.fail("Unexpected type found: " + auth.getTypeName());
+      }
+    }
+    Assert.assertEquals(authorizables.size(), 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
new file mode 100644
index 0000000..eafe0f0
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.kafka.authorizer;
+
+import kafka.network.RequestChannel;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Operation$;
+import kafka.security.auth.Resource;
+import kafka.security.auth.Resource$;
+import kafka.security.auth.ResourceType$;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.sentry.kafka.conf.KafkaAuthConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+public class SentryKafkaAuthorizerTest {
+
+  private SentryKafkaAuthorizer authorizer;
+  private InetAddress testHostName1;
+  private InetAddress testHostName2;
+  private String resourceName;
+  private Resource clusterResource;
+  private Resource topic1Resource;
+  private KafkaConfig config;
+
+  public SentryKafkaAuthorizerTest() throws UnknownHostException {
+    authorizer = new SentryKafkaAuthorizer();
+    testHostName1 = InetAddress.getByAddress("host1", new byte[] {1, 2, 3, 4});
+    testHostName2 = InetAddress.getByAddress("host2", new byte[] {2, 3, 4, 5});
+    resourceName = Resource$.MODULE$.ClusterResourceName();
+    clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), resourceName);
+    topic1Resource = new Resource(ResourceType$.MODULE$.fromString("topic"), "t1");
+  }
+
+  @Before
+  public void  setUp() {
+    Properties props = new Properties();
+    String sentry_site_path = SentryKafkaAuthorizerTest.class.getClassLoader().getResource(KafkaAuthConf.AUTHZ_SITE_FILE).getPath();
+    // Kafka check this prop when creating a config instance
+    props.put("zookeeper.connect", "test");
+    props.put("sentry.kafka.site.url", "file://" + sentry_site_path);
+
+    config = KafkaConfig.fromProps(props);
+    authorizer.configure(config.originals());
+  }
+
+  @Test
+  public void testAdmin() {
+
+    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin");
+    RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
+    RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
+
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Read"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Read"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+  }
+
+  @Test
+  public void testSubAdmin() {
+    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin");
+    RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
+    RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
+
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Read"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Read"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml
new file mode 100644
index 0000000..61a0463
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>org.apache.sentry.kafka.MockGroupMappingServiceProvider</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties b/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d42c02c
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+sentry.root.logger=DEBUG,console
+log4j.rootLogger=${sentry.root.logger}
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
+
+log4g.logger.kafka.utils.Logging=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.apache.sentry=DEBUG
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.I0Itec.zkclient=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.category.DataNucleus=OFF

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
new file mode 100644
index 0000000..69ce5a7
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>sentry.kafka.provider</name>
+    <value>org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider</value>
+  </property>
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>test</value>
+  </property>
+  <property>
+    <name>sentry.kafka.provider.resource</name>
+    <value>classpath:test-authz-provider.ini</value>
+  </property>
+  <property>
+    <name>sentry.kafka.policy.engine</name>
+    <value>org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine</value>
+  </property>
+  <property>
+    <name>sentry.kafka.provider.backend</name>
+    <value>org.apache.sentry.provider.file.SimpleFileProviderBackend</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
new file mode 100644
index 0000000..5f85382
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[groups]
+admin = admin_all
+subadmin = admin_host1
+consumer0 = consumer_t1_all
+consumer1 = consumer_t1_host1
+consumer2 = consumer_t2_host2
+producer0 = producer_t1_all
+producer1 = producer_t1_host1
+producer2 = producer_t2_host2
+consumer_producer0 = consumer_producer_t1
+
+[roles]
+admin_all = host=*
+admin_host1 = host=1.2.3.4
+consumer_t1_all = host=*->topic=t1->action=read
+consumer_t1_host1 = host=host1->topic=t1->action=read
+consumer_t2_host2 = host=host2->topic=t2->action=read
+producer_t1_all = host=*->topic=t1->action=write
+producer_t1_host1 = host=host1->topic=t1->action=write
+producer_t2_host2 = host=host2->topic=t2->action=write
+consumer_producer_t1 = host=host1->topic=t1->action=all

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java
index 6409015..c74641a 100644
--- a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java
+++ b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java
@@ -22,4 +22,5 @@ package org.apache.sentry.provider.common;
 public class AuthorizationComponent{
   public static final String Search = "solr";
   public static final String SQOOP = "sqoop";
+  public static final String KAFKA = "kafka";
 }


Mime
View raw message