sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject incubator-sentry git commit: SENTRY-1057: Add implementations for acls' CRUD (Ashish K Singh, reviewed by: Dapeng Sun and Hao Hao)
Date Sat, 05 Mar 2016 23:43:47 GMT
Repository: incubator-sentry
Updated Branches:
  refs/heads/kafka ab1093bef -> e6a2f13a0


SENTRY-1057: Add implementations for acls' CRUD (Ashish K Singh, reviewed by: Dapeng Sun and Hao Hao)

Change-Id: Iff5f23cee47bef256db387ceb032c1a6ea5c9124


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/e6a2f13a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/e6a2f13a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/e6a2f13a

Branch: refs/heads/kafka
Commit: e6a2f13a0ac589e25b3a42eecacc4cbf8200f9b7
Parents: ab1093b
Author: hahao <hao.hao@cloudera.com>
Authored: Sat Mar 5 15:42:45 2016 -0800
Committer: hahao <hao.hao@cloudera.com>
Committed: Sat Mar 5 15:42:45 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 pom.xml                                         |   3 +
 .../kafka/authorizer/SentryKafkaAuthorizer.java |  56 ++-
 .../sentry/kafka/binding/KafkaAuthBinding.java  | 369 ++++++++++++++++++-
 .../binding/KafkaAuthBindingSingleton.java      |   4 +-
 .../apache/sentry/kafka/conf/KafkaAuthConf.java |   4 +-
 .../authorizer/SentryKafkaAuthorizerTest.java   |   1 -
 .../core/model/kafka/KafkaActionFactory.java    |   3 +
 .../core/model/kafka/TestKafkaAuthorizable.java |   4 -
 sentry-provider/sentry-provider-db/pom.xml      |   4 +
 .../persistent/PrivilegeOperatePersistence.java |   2 +
 sentry-tests/pom.xml                            |   1 +
 sentry-tests/sentry-tests-kafka/pom.xml         |  64 ++++
 .../tests/e2e/kafka/CustomPrincipalBuilder.java |  47 +++
 .../tests/e2e/kafka/EmbeddedZkServer.java       |  71 ++++
 .../sentry/tests/e2e/kafka/KafkaTestServer.java | 124 +++++++
 .../sentry/tests/e2e/kafka/TestUtils.java       |  29 ++
 .../e2e/kafka/AbstractKafkaSentryTestBase.java  | 227 ++++++++++++
 .../tests/e2e/kafka/StaticUserGroupRole.java    |  57 +++
 .../sentry/tests/e2e/kafka/TestAclsCrud.java    | 328 +++++++++++++++++
 .../src/test/resources/log4j.properties         |  38 ++
 .../src/test/resources/test.crt                 |  15 +
 .../src/test/resources/user1.crt                |  15 +
 .../src/test/resources/user2.crt                |  15 +
 24 files changed, 1449 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a89bad8..08edd26 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
 *.class
+classes/
 target/
 .classpath
 .project

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ca2c92a..7932b99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -842,6 +842,9 @@ limitations under the License.
                   <exclude>**/metastore_db/</exclude>
                   <exclude>**/*.rej</exclude>
                   <exclude>**/thirdparty/</exclude>
+                  <!-- Exclude SSL .crtand .jks files -->
+                  <exclude>**/*.crt</exclude>
+                  <exclude>**/*.jks</exclude>
                 </excludes>
               </configuration>
             </execution>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
index 5bf520b..3bce6cc 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
@@ -27,6 +27,7 @@ import org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton;
 import org.apache.sentry.kafka.conf.KafkaAuthConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.collection.immutable.Map;
 import scala.collection.immutable.Set;
 
@@ -36,15 +37,15 @@ import java.util.List;
 
 public class SentryKafkaAuthorizer implements Authorizer {
 
-  private static Logger LOG =
-      LoggerFactory.getLogger(SentryKafkaAuthorizer.class);
+  private final static Logger LOG = LoggerFactory.getLogger(SentryKafkaAuthorizer.class);
+  private final static String INSTANCE_NAME = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_INSTANCE_NAME);
 
-  KafkaAuthBinding binding;
-  KafkaAuthConf kafkaAuthConf;
+  private KafkaAuthBinding binding;
+  private String kafkaServiceInstanceName = INSTANCE_NAME;
+  private String requestorName = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_USER_NAME);
 
   String sentry_site = null;
   List<KafkaPrincipal> super_users = null;
-  String kafkaServiceInstanceName = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_INSTANCE_NAME);
 
   public SentryKafkaAuthorizer() {
   }
@@ -60,36 +61,36 @@ public class SentryKafkaAuthorizer implements Authorizer {
     }
     LOG.debug("User: " + user + " is not a SuperUser");
     return binding.authorize(session, operation, resource);
-  }
+}
 
   @Override
   public void addAcls(Set<Acl> acls, final Resource resource) {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    binding.addAcls(acls, resource);
   }
 
   @Override
   public boolean removeAcls(Set<Acl> acls, final Resource resource) {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    return binding.removeAcls(acls, resource);
   }
 
   @Override
   public boolean removeAcls(final Resource resource) {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    return binding.removeAcls(resource);
   }
 
   @Override
   public Set<Acl> getAcls(Resource resource) {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    return binding.getAcls(resource);
   }
 
   @Override
   public Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    return binding.getAcls(principal);
   }
 
   @Override
   public Map<Resource, Set<Acl>> getAcls() {
-    throw new UnsupportedOperationException("Please use Sentry CLI to perform this action.");
+    return binding.getAcls();
   }
 
   @Override
@@ -110,11 +111,14 @@ public class SentryKafkaAuthorizer implements Authorizer {
     if (kafkaServiceInstanceName != null) {
       this.kafkaServiceInstanceName = kafkaServiceInstanceName.toString();
     }
+    final Object kafkaServiceUserName = configs.get(KafkaAuthConf.KAFKA_SERVICE_USER_NAME);
+    if (kafkaServiceUserName != null) {
+      this.requestorName = kafkaServiceUserName.toString();
+    }
     LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site);
     final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance();
-    instance.configure(this.kafkaServiceInstanceName, sentry_site);
+    instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site);
     this.binding = instance.getAuthBinding();
-    this.kafkaAuthConf = instance.getKafkaAuthConf();
   }
 
   private void getSuperUsers(String kafkaSuperUsers) {
@@ -139,4 +143,28 @@ public class SentryKafkaAuthorizer implements Authorizer {
     }
     return false;
   }
+
+  /**
+   * This is not used by Kafka, however as role is a Sentry centric entity having some mean to perform role CRUD will be required.
+   * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups.
+   */
+  public void addRole(String role) {
+    binding.addRole(role);
+  }
+
+  /**
+   * This is not used by Kafka, however as role is a Sentry centric entity having some mean to add role to groups will be required.
+   * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups.
+   */
+  public void addRoleToGroups(String role, java.util.Set<String> groups) {
+    binding.addRoleToGroups(role, groups);
+  }
+
+  /**
+   * This is not used by Kafka, however as role is a Sentry centric entity having some mean to perform role CRUD will be required.
+   * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups.
+   */
+  public void dropAllRoles() {
+    binding.dropAllRoles();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index a54eb8f..8f4a8c4 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -17,20 +17,32 @@
 package org.apache.sentry.kafka.binding;
 
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import kafka.security.auth.Acl;
+import kafka.security.auth.Allow;
+import kafka.security.auth.Allow$;
+import kafka.security.auth.Operation$;
+import kafka.security.auth.ResourceType$;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Sets;
 import kafka.network.RequestChannel;
 import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.sentry.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.Subject;
 import org.apache.sentry.core.model.kafka.KafkaActionFactory;
 import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
 import org.apache.sentry.kafka.ConvertUtil;
 import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars;
 import org.apache.sentry.policy.common.PolicyEngine;
@@ -38,23 +50,40 @@ import org.apache.sentry.provider.common.AuthorizationComponent;
 import org.apache.sentry.provider.common.AuthorizationProvider;
 import org.apache.sentry.provider.common.ProviderBackend;
 import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
+import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Predef;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.JavaConversions;
+import scala.collection.immutable.Map;
 
 public class KafkaAuthBinding {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
     private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
+    private static final String COMPONENT_NAME = COMPONENT_TYPE;
 
     private final Configuration authConf;
     private final AuthorizationProvider authProvider;
+    private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
+
     private ProviderBackend providerBackend;
+    private String instanceName;
+    private String requestorName;
 
-    private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
 
-    public KafkaAuthBinding(String instanceName, Configuration authConf) throws Exception {
+    public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception {
+        this.instanceName = instanceName;
+        this.requestorName = requestorName;
         this.authConf = authConf;
-        this.authProvider = createAuthProvider(instanceName);
+        this.authProvider = createAuthProvider();
     }
 
     /**
@@ -62,7 +91,7 @@ public class KafkaAuthBinding {
      *
      * @return {@link AuthorizationProvider}
      */
-    private AuthorizationProvider createAuthProvider(String instanceName) throws Exception {
+    private AuthorizationProvider createAuthProvider() throws Exception {
         /**
          * get the authProvider class, policyEngine class, providerBackend class and resources from the
          * kafkaAuthConf config
@@ -127,6 +156,324 @@ public class KafkaAuthBinding {
         return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
     }
 
+    public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+        verifyAcls(acls);
+        LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
+
+        final Iterator<Acl> iterator = acls.iterator();
+        while (iterator.hasNext()) {
+            final Acl acl = iterator.next();
+            final String role = getRole(acl);
+            if (!roleExists(role)) {
+                throw new KafkaException("Can not add Acl for non-existent Role: " + role);
+            }
+            execute(new Command<Void>() {
+                @Override
+                public Void run(SentryGenericServiceClient client) throws Exception {
+                    client.grantPrivilege(
+                        requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
+                    return null;
+                }
+            });
+        }
+    }
+
+    public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+        verifyAcls(acls);
+        LOG.info("Removing Acl: acl->" + acls + " resource->" + resource);
+        final Iterator<Acl> iterator = acls.iterator();
+        while (iterator.hasNext()) {
+            final Acl acl = iterator.next();
+            final String role = getRole(acl);
+            try {
+                execute(new Command<Void>() {
+                    @Override
+                    public Void run(SentryGenericServiceClient client) throws Exception {
+                        client.dropPrivilege(
+                                requestorName, role, toTSentryPrivilege(acl, resource));
+                        return null;
+                    }
+                });
+            } catch (KafkaException kex) {
+                LOG.error("Failed to remove acls.", kex);
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void addRole(final String role) {
+        if (roleExists(role)) {
+            throw new KafkaException("Can not create an existing role, " + role + ", again.");
+        }
+
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                client.createRole(
+                    requestorName, role, COMPONENT_NAME);
+                return null;
+            }
+        });
+    }
+
+    public void addRoleToGroups(final String role, final java.util.Set<String> groups) {
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                client.addRoleToGroups(
+                    requestorName, role, COMPONENT_NAME, groups);
+                return null;
+            }
+        });
+    }
+
+    public void dropAllRoles() {
+        final List<String> roles = getAllRoles();
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                for (String role : roles) {
+                    client.dropRole(requestorName, role, COMPONENT_NAME);
+                }
+                return null;
+            }
+        });
+    }
+
+    private List<String> getRolesforGroup(final String groupName) {
+        final List<String> roles = new ArrayList<>();
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) {
+                    roles.add(tSentryRole.getRoleName());
+                }
+                return null;
+            }
+        });
+
+        return roles;
+    }
+
+    private SentryGenericServiceClient getClient() throws Exception {
+        return SentryGenericServiceClientFactory.create(this.authConf);
+    }
+
+    public boolean removeAcls(final Resource resource) {
+        LOG.info("Removing Acls for Resource: resource->" + resource);
+        List<String> roles = getAllRoles();
+        final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles);
+        try {
+            execute(new Command<Void>() {
+                @Override
+                public Void run(SentryGenericServiceClient client) throws Exception {
+                    for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) {
+                        if (isPrivilegeForResource(tSentryPrivilege, resource)) {
+                            client.dropPrivilege(
+                                    requestorName, COMPONENT_NAME, tSentryPrivilege);
+                        }
+                    }
+                    return null;
+                }
+            });
+        } catch (KafkaException kex) {
+            LOG.error("Failed to remove acls.", kex);
+            return false;
+        }
+
+        return true;
+    }
+
+    public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
+        final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource);
+        if (acls.nonEmpty())
+            return acls.get();
+        return new scala.collection.immutable.HashSet<Acl>();
+    }
+
+    public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) {
+        if (principal.getPrincipalType().toLowerCase().equals("group")) {
+            List<String> roles = getRolesforGroup(principal.getName());
+            return getAclsForRoles(roles);
+        } else {
+            LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals.");
+            return getAcls();
+        }
+    }
+
+    public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
+        final List<String> roles = getAllRoles();
+        return getAclsForRoles(roles);
+    }
+
+    /**
+     * A Command is a closure used to pass a block of code from individual
+     * functions to execute, which centralizes connection error
+     * handling. Command is parameterized on the return type of the function.
+     */
+    private interface Command<T> {
+        T run(SentryGenericServiceClient client) throws Exception;
+    }
+
+    private <T> T execute(Command<T> cmd) throws KafkaException {
+        SentryGenericServiceClient client = null;
+        try {
+            client = getClient();
+            return cmd.run(client);
+        } catch (SentryUserException ex) {
+            String msg = "Unable to excute command on sentry server: " + ex.getMessage();
+            LOG.error(msg, ex);
+            throw new KafkaException(msg, ex);
+        } catch (Exception ex) {
+            String msg = "Unable to obtain client:" + ex.getMessage();
+            LOG.error(msg, ex);
+            throw new KafkaException(msg, ex);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+    private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) {
+        final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource);
+        final List<TAuthorizable> tAuthorizables = new ArrayList<>();
+        for (Authorizable authorizable : authorizables) {
+            tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
+        }
+        TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name());
+        return tSentryPrivilege;
+    }
+
+    private String getRole(Acl acl) {
+        return acl.principal().getName();
+    }
+
+    private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) {
+        final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator();
+        while (authorizablesIterator.hasNext()) {
+            TAuthorizable tAuthorizable = authorizablesIterator.next();
+            if (tAuthorizable.getType().equals(resource.resourceType().name())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) {
+        final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>();
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                for (String role : roles) {
+                    tSentryPrivileges.addAll(client.listPrivilegesByRoleName(
+                        requestorName, role, COMPONENT_NAME, instanceName));
+                }
+                return null;
+            }
+        });
+
+        return tSentryPrivileges;
+    }
+
+    private List<String> getAllRoles() {
+        final List<String> roles = new ArrayList<>();
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) {
+                    roles.add(tSentryRole.getRoleName());
+                }
+                return null;
+            }
+        });
+
+        return roles;
+    }
+
+    private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) {
+        return scala.collection.JavaConverters.mapAsScalaMapConverter(
+                rolePrivilegesToResourceAcls(getRoleToPrivileges(roles)))
+                .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms());
+    }
+
+    private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) {
+        final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>();
+        for (String role : rolePrivilegesMap.keySet()) {
+            scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role);
+            final Iterator<TSentryPrivilege> iterator = privileges.iterator();
+            while (iterator.hasNext()) {
+                TSentryPrivilege privilege = iterator.next();
+                final List<TAuthorizable> authorizables = privilege.getAuthorizables();
+                String host = null;
+                String operation = privilege.getAction();
+                for (TAuthorizable tAuthorizable : authorizables) {
+                    if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+                        host = tAuthorizable.getName();
+                    } else {
+                        Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName());
+                        if (operation.equals("*")) {
+                            operation = "All";
+                        }
+                        Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation));
+                        Set<Acl> newAclsJava = new HashSet<Acl>();
+                        newAclsJava.add(acl);
+                        addExistingAclsForResource(resourceAclsMap, resource, newAclsJava);
+                        final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava);
+                        resourceAclsMap.put(resource, aclScala.<Acl>toSet());
+                    }
+                }
+            }
+        }
+
+        return resourceAclsMap;
+    }
+
+    private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) {
+        final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>();
+        execute(new Command<Void>() {
+            @Override
+            public Void run(SentryGenericServiceClient client) throws Exception {
+                for (String role : roles) {
+                    final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName(
+                        requestorName, role, COMPONENT_NAME, instanceName);
+                    final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala =
+                        scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet();
+                    rolePrivilegesMap.put(role, rolePrivilegesScala);
+                }
+                return null;
+            }
+        });
+
+        return rolePrivilegesMap;
+    }
+
+    private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) {
+        final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource);
+        if (existingAcls != null) {
+            final Iterator<Acl> aclsIter = existingAcls.iterator();
+            while (aclsIter.hasNext()) {
+                Acl curAcl = aclsIter.next();
+                newAclsJava.add(curAcl);
+            }
+        }
+    }
+
+    private boolean roleExists(String role) {
+        return getAllRoles().contains(role);
+    }
+
+    private void verifyAcls(scala.collection.immutable.Set<Acl> acls) {
+        final Iterator<Acl> iterator = acls.iterator();
+        while (iterator.hasNext()) {
+            final Acl acl = iterator.next();
+            assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported.";
+            assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported.";
+        }
+    }
+
     /*
     * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
     * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
@@ -136,13 +483,13 @@ public class KafkaAuthBinding {
         int start = principalName.indexOf("CN=");
         if (start >= 0) {
             String tmpName, name = "";
-                tmpName = principalName.substring(start + 3);
-                int end = tmpName.indexOf(",");
-                if (end > 0) {
-                    name = tmpName.substring(0, end);
-                } else {
-                    name = tmpName;
-                }
+            tmpName = principalName.substring(start + 3);
+            int end = tmpName.indexOf(",");
+            if (end > 0) {
+                name = tmpName.substring(0, end);
+            } else {
+                name = tmpName;
+            }
             return name;
         } else {
             return principalName;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
index d7a5d1c..a0007a3 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
@@ -56,10 +56,10 @@ public class KafkaAuthBindingSingleton {
     return kafkaAuthConf;
   }
 
-  public void configure(String instanceName, String sentry_site) {
+  public void configure(String instanceName, String requestorName, String sentry_site) {
     try {
       kafkaAuthConf = loadAuthzConf(sentry_site);
-      binding = new KafkaAuthBinding(instanceName, kafkaAuthConf);
+      binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf);
       log.info("KafkaAuthBinding created successfully");
     } catch (Exception ex) {
       log.error("Unable to create KafkaAuthBinding", ex);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index cff9418..e0d767e 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -29,6 +29,7 @@ public class KafkaAuthConf extends Configuration {
   public static final String AUTHZ_SITE_FILE = "sentry-site.xml";
   public static final String KAFKA_SUPER_USERS = "kafka.superusers";
   public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance";
+  public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name";
 
   /**
    * Config setting definitions
@@ -38,7 +39,8 @@ public class KafkaAuthConf extends Configuration {
     AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""),
     AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
     AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
-    AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka");
+    AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
+    AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka");
 
     private final String varName;
     private final String defaultVal;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
index eafe0f0..f40d8c2 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
@@ -17,7 +17,6 @@
 package org.apache.sentry.kafka.authorizer;
 
 import kafka.network.RequestChannel;
-import kafka.security.auth.Operation;
 import kafka.security.auth.Operation$;
 import kafka.security.auth.Resource;
 import kafka.security.auth.Resource$;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
index 7b8b518..fc3bf7a 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
@@ -185,6 +185,9 @@ public class KafkaActionFactory extends BitFieldActionFactory {
    */
   @Override
   public KafkaAction getActionByName(String name) {
+    if (name.equalsIgnoreCase("*")) {
+      return new KafkaAction("ALL");
+    }
     return KafkaActionType.hasActionType(name) ? new KafkaAction(name) : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
index 81446a7..04316f2 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -19,11 +19,7 @@ package org.apache.sentry.core.model.kafka;
 
 import junit.framework.Assert;
 
-import org.apache.sentry.core.model.kafka.Cluster;
-import org.apache.sentry.core.model.kafka.ConsumerGroup;
 import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.core.model.kafka.Topic;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-provider/sentry-provider-db/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml
index 7514a7c..9a10829 100644
--- a/sentry-provider/sentry-provider-db/pom.xml
+++ b/sentry-provider/sentry-provider-db/pom.xml
@@ -92,6 +92,10 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-model-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
       <artifactId>sentry-provider-common</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java
index c3b0be8..3ccb3aa 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java
@@ -31,6 +31,7 @@ import org.apache.sentry.core.common.Action;
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.BitFieldAction;
 import org.apache.sentry.core.common.BitFieldActionFactory;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory;
 import org.apache.sentry.core.model.search.SearchActionFactory;
 import org.apache.sentry.core.model.sqoop.SqoopActionFactory;
 import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder;
@@ -51,6 +52,7 @@ public class PrivilegeOperatePersistence {
   static{
     actionFactories.put("solr", new SearchActionFactory());
     actionFactories.put("sqoop", new SqoopActionFactory());
+    actionFactories.put("kafka", KafkaActionFactory.getInstance());
   }
 
   public boolean checkPrivilegeOption(Set<MSentryRole> roles, PrivilegeObject privilege, PersistenceManager pm) {

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/pom.xml b/sentry-tests/pom.xml
index 3294335..88a28bb 100644
--- a/sentry-tests/pom.xml
+++ b/sentry-tests/pom.xml
@@ -31,6 +31,7 @@ limitations under the License.
     <module>sentry-tests-hive</module>
     <module>sentry-tests-solr</module>
     <module>sentry-tests-sqoop</module>
+    <module>sentry-tests-kafka</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/pom.xml b/sentry-tests/sentry-tests-kafka/pom.xml
new file mode 100644
index 0000000..54c7205
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>sentry-tests</artifactId>
+    <groupId>org.apache.sentry</groupId>
+    <version>1.7.0-incubating-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>sentry-tests-kafka</artifactId>
+  <name>Sentry Kafka Tests</name>
+  <description>end to end tests for sentry-kafka integration</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-binding-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-db</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java
new file mode 100644
index 0000000..5531fcb
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.tests.e2e.kafka;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+
+import java.security.Principal;
+import java.util.Map;
+
+public class CustomPrincipalBuilder implements PrincipalBuilder {
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+
+    @Override
+    public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException {
+        try {
+            return transportLayer.peerPrincipal();
+        } catch (Exception e) {
+            throw new KafkaException("Failed to build principal due to: ", e);
+        }
+    }
+
+    @Override
+    public void close() throws KafkaException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java
new file mode 100644
index 0000000..442ddff
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.kafka;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class EmbeddedZkServer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedZkServer.class);
+
+    private Path snapshotDir = null;
+    private Path logDir = null;
+    private ZooKeeperServer zookeeper = null;
+    private NIOServerCnxnFactory factory = null;
+
+    public EmbeddedZkServer(int port) throws Exception {
+        snapshotDir = Files.createTempDirectory("zookeeper-snapshot-");
+        logDir = Files.createTempDirectory("zookeeper-log-");
+        int tickTime = 500;
+        zookeeper = new ZooKeeperServer(snapshotDir.toFile(), logDir.toFile(), tickTime);
+        factory = new NIOServerCnxnFactory();
+        InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+        LOGGER.info("Starting Zookeeper at " + addr);
+        factory.configure(addr, 0);
+        factory.startup(zookeeper);
+    }
+
+    public void shutdown() throws IOException {
+        try {
+            zookeeper.shutdown();
+        } catch (Exception e) {
+            LOGGER.error("Failed to shutdown ZK server", e);
+        }
+
+        try {
+            factory.shutdown();
+        } catch (Exception e) {
+            LOGGER.error("Failed to shutdown Zk connection factory.", e);
+        }
+
+        FileUtils.deleteDirectory(logDir.toFile());
+        FileUtils.deleteDirectory(snapshotDir.toFile());
+    }
+
+    public ZooKeeperServer getZk() {
+        return zookeeper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
new file mode 100644
index 0000000..129191a
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.tests.e2e.kafka;
+
+import kafka.server.KafkaServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Properties;
+
+public class KafkaTestServer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestServer.class);
+
+    private int zkPort = -1;
+    private int kafkaPort = -1;
+    private EmbeddedZkServer zkServer = null;
+    private KafkaServerStartable kafkaServer = null;
+    private File sentrySitePath = null;
+
+    public KafkaTestServer(File sentrySitePath) throws Exception {
+        this.zkPort = TestUtils.getFreePort();
+        this.kafkaPort = TestUtils.getFreePort();
+        this.sentrySitePath = sentrySitePath;
+        createZkServer();
+        createKafkaServer();
+    }
+
+    public void start() throws Exception {
+        kafkaServer.startup();
+        LOGGER.info("Started Kafka broker.");
+    }
+
+    public void shutdown() {
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+            kafkaServer.awaitShutdown();
+            LOGGER.info("Stopped Kafka server.");
+        }
+
+        if (zkServer != null) {
+            try {
+                zkServer.shutdown();
+                LOGGER.info("Stopped ZK server.");
+            } catch (IOException e) {
+                LOGGER.error("Failed to shutdown ZK server.", e);
+            }
+        }
+    }
+
+    private Path getTempDirectory() {
+        Path tempDirectory = null;
+        try {
+            tempDirectory = Files.createTempDirectory("kafka-sentry-");
+        } catch (IOException e) {
+            LOGGER.error("Failed to create temp dir for Kafka's log dir.");
+            throw new RuntimeException(e);
+        }
+        return tempDirectory;
+    }
+
+    private void setupKafkaProps(Properties props) throws UnknownHostException {
+        props.put("listeners", "SSL://" + InetAddress.getLocalHost().getHostAddress() + ":" + kafkaPort);
+        props.put("log.dir", getTempDirectory().toAbsolutePath().toString());
+        props.put("zookeeper.connect", InetAddress.getLocalHost().getHostAddress() + ":" + zkPort);
+        props.put("replica.socket.timeout.ms", "1500");
+        props.put("controller.socket.timeout.ms", "1500");
+        props.put("controlled.shutdown.enable", true);
+        props.put("delete.topic.enable", false);
+        props.put("controlled.shutdown.retry.backoff.ms", "100");
+        props.put("port", kafkaPort);
+        props.put("authorizer.class.name", "org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer");
+        props.put("sentry.kafka.site.url", "file://" + sentrySitePath.getAbsolutePath());
+        props.put("allow.everyone.if.no.acl.found", "true");
+        props.put("ssl.keystore.location", KafkaTestServer.class.getResource("/test.keystore.jks").getPath());
+        props.put("ssl.keystore.password", "test-ks-passwd");
+        props.put("ssl.key.password", "test-key-passwd");
+        props.put("ssl.truststore.location", KafkaTestServer.class.getResource("/test.truststore.jks").getPath());
+        props.put("ssl.truststore.password", "test-ts-passwd");
+        props.put("security.inter.broker.protocol", "SSL");
+        props.put("ssl.client.auth", "required");
+        props.put("kafka.superusers", "User:CN=superuser;User:CN=superuser1; User:CN=Superuser2 ");
+    }
+
+    private void createKafkaServer() throws UnknownHostException {
+        Properties props = new Properties();
+        setupKafkaProps(props);
+        kafkaServer = KafkaServerStartable.fromProps(props);
+    }
+
+    private void createZkServer() throws Exception {
+        try {
+            zkServer = new EmbeddedZkServer(zkPort);
+            zkPort = zkServer.getZk().getClientPort();
+        } catch (Exception e) {
+            LOGGER.error("Failed to create testing zookeeper server.");
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getBootstrapServers() {
+        return "localhost:" + kafkaPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java
new file mode 100644
index 0000000..dda4047
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.kafka;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public class TestUtils {
+    public static int getFreePort() throws IOException {
+        synchronized (TestUtils.class) {
+            ServerSocket serverSocket = new ServerSocket(0);
+            return serverSocket.getLocalPort();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
new file mode 100644
index 0000000..a2cfa28
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.tests.e2e.kafka;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.kafka.conf.KafkaAuthConf;
+import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
+import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
+import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.SentryService;
+import org.apache.sentry.service.thrift.SentryServiceFactory;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This class used to test the Kafka integration with Sentry.
+ */
+public class AbstractKafkaSentryTestBase {
+
+  protected static final String COMPONENT = "kafka";
+  protected static final String ADMIN_USER = "kafka";
+  protected static final String ADMIN_GROUP = "group_kafka";
+  protected static final String ADMIN_ROLE  = "role_kafka";
+
+  protected static SentryService sentryServer;
+  protected static File sentrySitePath;
+
+  protected static File baseDir;
+  protected static File dbDir;
+  protected static File policyFilePath;
+
+  protected static PolicyFile policyFile;
+
+  protected static String bootstrapServers = null;
+  protected static KafkaTestServer kafkaServer = null;
+
+  @BeforeClass
+  public static void beforeTestEndToEnd() throws Exception {
+    setupConf();
+    startSentryServer();
+    setUserGroups();
+    setAdminPrivilege();
+    startKafkaServer();
+  }
+
+  @AfterClass
+  public static void afterTestEndToEnd() throws Exception {
+    stopSentryServer();
+    stopKafkaServer();
+  }
+
+  private static void stopKafkaServer() {
+    if (kafkaServer != null) {
+      kafkaServer.shutdown();
+      kafkaServer = null;
+    }
+  }
+
+  private static void stopSentryServer() throws Exception {
+    if (sentryServer != null) {
+      sentryServer.stop();
+      sentryServer = null;
+    }
+
+    FileUtils.deleteDirectory(baseDir);
+  }
+
+  public static void setupConf() throws Exception {
+    baseDir = createTempDir();
+    sentrySitePath = new File(baseDir, "sentry-site.xml");
+    dbDir = new File(baseDir, "sentry_policy_db");
+    policyFilePath = new File(baseDir, "local_policy_file.ini");
+    policyFile = new PolicyFile();
+
+    /** set the configuratoion for Sentry Service */
+    Configuration conf = new Configuration();
+
+    conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
+    conf.set(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+    conf.set(ServerConfig.ADMIN_GROUPS, Joiner.on(",").join(ADMIN_GROUP,
+        UserGroupInformation.getLoginUser().getPrimaryGroupName()));
+    conf.set(ServerConfig.RPC_PORT, String.valueOf(TestUtils.getFreePort()));
+    conf.set(ServerConfig.RPC_ADDRESS, NetUtils.createSocketAddr(
+            InetAddress.getLocalHost().getHostAddress() + ":" + conf.get(ServerConfig.RPC_PORT))
+            .getAddress().getCanonicalHostName());
+    conf.set(ServerConfig.SENTRY_STORE_JDBC_URL,
+        "jdbc:derby:;databaseName=" + dbDir.getPath() + ";create=true");
+    conf.set(ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy");
+    conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING,
+        ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
+    conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
+        policyFilePath.getPath());
+    sentryServer = new SentryServiceFactory().create(conf);
+  }
+
+  public static File createTempDir() {
+    File baseDir = new File(System.getProperty("java.io.tmpdir"));
+    String baseName = "kafka-e2e-";
+    File tempDir = new File(baseDir, baseName + UUID.randomUUID().toString());
+    if (tempDir.mkdir()) {
+        return tempDir;
+    }
+    throw new IllegalStateException("Failed to create temp directory");
+  }
+
+  public static void startSentryServer() throws Exception {
+    sentryServer.start();
+    final long start = System.currentTimeMillis();
+    while(!sentryServer.isRunning()) {
+      Thread.sleep(1000);
+      if(System.currentTimeMillis() - start > 60000L) {
+        throw new TimeoutException("Server did not start after 60 seconds");
+      }
+    }
+  }
+
+  public static void setUserGroups() throws Exception {
+    for (String user : StaticUserGroupRole.getUsers()) {
+      Set<String> groups = StaticUserGroupRole.getGroups(user);
+      policyFile.addGroupsToUser(user,
+          groups.toArray(new String[groups.size()]));
+    }
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    policyFile.addGroupsToUser(loginUser.getShortUserName(), loginUser.getGroupNames());
+
+    policyFile.write(policyFilePath);
+  }
+
+  public static void setAdminPrivilege() throws Exception {
+    SentryGenericServiceClient sentryClient = null;
+    try {
+      /** grant all privilege to admin user */
+      sentryClient = getSentryClient();
+      sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
+      sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
+      final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
+      Host host = new Host(InetAddress.getLocalHost().getHostName());
+      authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+      Cluster cluster = new Cluster();
+      authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName()));
+      sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
+          new TSentryPrivilege(COMPONENT, "kafka", authorizables,
+              KafkaActionConstant.ALL));
+    } finally {
+      if (sentryClient != null) {
+        sentryClient.close();
+      }
+    }
+  }
+
+  protected static SentryGenericServiceClient getSentryClient() throws Exception {
+    return SentryGenericServiceClientFactory.create(getClientConfig());
+  }
+
+  public static void assertCausedMessage(Exception e, String message) {
+    if (e.getCause() != null) {
+      assertTrue("Expected message: " + message + ", but got: " + e.getCause().getMessage(), e.getCause().getMessage().contains(message));
+    } else {
+      assertTrue("Expected message: " + message + ", but got: " + e.getMessage(), e.getMessage().contains(message));
+    }
+  }
+
+  private static Configuration getClientConfig() {
+    Configuration conf = new Configuration();
+    /** set the Sentry client configuration for Kafka Service integration */
+    conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
+    conf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress().getHostName());
+    conf.set(ClientConfig.SERVER_RPC_PORT, String.valueOf(sentryServer.getAddress().getPort()));
+
+    conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER.getVar(),
+        LocalGroupResourceAuthorizationProvider.class.getName());
+    conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
+        SentryGenericProviderBackend.class.getName());
+    conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), policyFilePath.getPath());
+    return conf;
+  }
+
+  private static void startKafkaServer() throws Exception {
+    // Workaround for SentryKafkaAuthorizer to be added to classpath
+    Class.forName("org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer");
+    getClientConfig().writeXml(new FileOutputStream(sentrySitePath));
+
+    kafkaServer = new KafkaTestServer(sentrySitePath);
+    kafkaServer.start();
+    bootstrapServers = kafkaServer.getBootstrapServers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java
new file mode 100644
index 0000000..96b7cf4
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.kafka;
+
+import com.google.common.collect.Sets;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class StaticUserGroupRole {
+  public static final String SUPERUSER = "superuser";
+  public static final String USER_1 = "user1";
+  public static final String USER_2 = "user2";
+  public static final String USER_KAFKA = "kafka";
+
+  public static final String GROUP_0 = "group0";
+  public static final String GROUP_1 = "group1";
+  public static final String GROUP_2 = "group2";
+  public static final String GROUP_KAFKA = "group_kafka";
+
+  public static final String ROLE_0 = "role0";
+  public static final String ROLE_1 = "role1";
+  public static final String ROLE_2 = "role2";
+
+  private static Map<String, Set<String>> userToGroupsMapping =
+      new HashMap<String, Set<String>>();
+
+  static {
+    userToGroupsMapping.put(SUPERUSER, Sets.newHashSet(GROUP_0));
+    userToGroupsMapping.put(USER_1, Sets.newHashSet(GROUP_1));
+    userToGroupsMapping.put(USER_2, Sets.newHashSet(GROUP_2));
+    userToGroupsMapping.put(USER_KAFKA, Sets.newHashSet(GROUP_KAFKA));
+  }
+
+  public static Set<String> getUsers() {
+    return userToGroupsMapping.keySet();
+  }
+
+  public static Set<String> getGroups(String user) {
+    return userToGroupsMapping.get(user);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
new file mode 100644
index 0000000..135d362
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.kafka;
+
+import junit.framework.Assert;
+import kafka.security.auth.Acl;
+import kafka.security.auth.Allow$;
+import kafka.security.auth.Operation$;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType$;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer;
+import org.apache.sentry.kafka.conf.KafkaAuthConf;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.immutable.Map;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestAclsCrud extends AbstractKafkaSentryTestBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestAclsCrud.class);
+  private SentryKafkaAuthorizer sentryKafkaAuthorizer;
+
+  @After
+  public void cleanUp() throws Exception {
+    sentryKafkaAuthorizer.dropAllRoles();
+    if (sentryKafkaAuthorizer != null) {
+      sentryKafkaAuthorizer.close();
+      sentryKafkaAuthorizer = null;
+    }
+  }
+
+
+  @Test
+  public void testAddAclsForNonExistentRole() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    Set<Acl> acls = new HashSet<>();
+    final Acl acl = new Acl(new KafkaPrincipal("role", role1),
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet();
+    Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Can not add Acl for non-existent Role: role1");
+    }
+  }
+
+  @Test
+  public void testAddRole() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+  }
+
+  @Test
+  public void testAddExistingRole() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    // Add role the first time
+    final String role1 = "role1";
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Try adding same role again
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Can not create an existing role, role1, again.");
+    }
+  }
+
+  @Test
+  public void testAddAcls() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    Set<Acl> acls = new HashSet<>();
+    final Acl acl = new Acl(new KafkaPrincipal("role", role1),
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet();
+    Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+
+    // Add role
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Add acl
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acl.");
+    }
+
+    final scala.collection.immutable.Set<Acl> obtainedAcls = sentryKafkaAuthorizer.getAcls(resource);
+    Assert.assertTrue("Obtained acls did not match expected Acls", obtainedAcls.contains(acl));
+  }
+
+  @Test
+  public void testAddRoleToGroups() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    Set<Acl> acls = new HashSet<>();
+    final Acl acl = new Acl(new KafkaPrincipal("role", role1),
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet();
+    Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+
+    // Add role
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Add acl
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acl.");
+    }
+
+    // Add role to group
+    Set<String> groups = new HashSet<>();
+    String group1 = "group1";
+    groups.add(group1);
+    try {
+      sentryKafkaAuthorizer.addRoleToGroups(role1, groups);
+    } catch (Exception ex) {
+      throw ex;
+    }
+
+    final scala.collection.immutable.Set<Acl> obtainedAcls = sentryKafkaAuthorizer.getAcls(new KafkaPrincipal("group", group1)).get(resource).get();
+    Assert.assertTrue("Obtained acls did not match expected Acls", obtainedAcls.contains(acl));
+  }
+
+  @Test
+  public void testRemoveAclsByResource() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    Set<Acl> acls = new HashSet<>();
+    final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1);
+    final Acl acl = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet();
+    Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+
+    // Add role
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Add acl
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acl.");
+    }
+
+    // Add acl for different resource
+    Set<Acl> acls2 = new HashSet<>();
+    final Acl acl2 = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("WRITE"));
+    acls2.add(acl2);
+    scala.collection.immutable.Set<Acl> aclsScala2 = scala.collection.JavaConversions.asScalaSet(acls2).toSet();
+    Resource resource2 = new Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster");
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala2, resource2);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add second acl.");
+    }
+
+    try {
+      sentryKafkaAuthorizer.removeAcls(resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to remove acls for resource.");
+    }
+
+    final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls = sentryKafkaAuthorizer.getAcls(principal1);
+    Assert.assertTrue("Obtained acls must not contain acl for removed resource's acls.", !obtainedAcls.keySet().contains(resource));
+    Assert.assertTrue("Obtained acls must contain acl for resource2.", obtainedAcls.keySet().contains(resource2));
+    Assert.assertTrue("Obtained acl does not match expected acl.", obtainedAcls.get(resource2).get().contains(acl2));
+  }
+
+  @Test
+  public void testRemoveAclsByAclsAndResource() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    Set<Acl> acls = new HashSet<>();
+    final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1);
+    final Acl acl = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet();
+    Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+
+    // Add role
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Add acl
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acl.");
+    }
+
+    // Add another acl to same resource
+    Set<Acl> acls01 = new HashSet<>();
+    final Acl acl01 = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("DESCRIBE"));
+    acls01.add(acl01);
+    scala.collection.immutable.Set<Acl> aclsScala01 = scala.collection.JavaConversions.asScalaSet(acls01).toSet();
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala01, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acl.");
+    }
+
+
+    // Add acl for different resource
+    Set<Acl> acls2 = new HashSet<>();
+    final Acl acl2 = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("WRITE"));
+    acls2.add(acl2);
+    scala.collection.immutable.Set<Acl> aclsScala2 = scala.collection.JavaConversions.asScalaSet(acls2).toSet();
+    Resource resource2 = new Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster");
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala2, resource2);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add second acl.");
+    }
+
+    // Remove acls
+    try {
+      sentryKafkaAuthorizer.removeAcls(aclsScala, resource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to remove acls for resource.");
+    }
+
+    final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls = sentryKafkaAuthorizer.getAcls(principal1);
+    Assert.assertTrue("Obtained acls must contain acl for resource.", obtainedAcls.keySet().contains(resource));
+    Assert.assertTrue("Obtained acls must contain acl for resource2.", obtainedAcls.keySet().contains(resource2));
+    Assert.assertTrue("Obtained acl must not contain removed acl for resource.", !obtainedAcls.get(resource).get().contains(acl));
+    Assert.assertTrue("Obtained acl does not match expected acl for resource.", obtainedAcls.get(resource).get().contains(acl01));
+    Assert.assertTrue("Obtained acl does not match expected acl for resource2.", obtainedAcls.get(resource2).get().contains(acl2));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties b/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..5f52884
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties.
+#
+# For testing, it may also be convenient to specify
+
+sentry.root.logger=DEBUG,console
+log4j.rootLogger=${sentry.root.logger}
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
+
+log4g.logger.kafka.utils.Logging=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.apache.sentry=DEBUG
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.I0Itec.zkclient=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.category.DataNucleus=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt
new file mode 100644
index 0000000..fd6c902
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICxzCCAa+gAwIBAgIEK13qfTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDEwlzdXBlcnVzZXIw
+HhcNMTUxMjE1MjMzNTAzWhcNMTYwMzE0MjMzNTAzWjAUMRIwEAYDVQQDEwlzdXBlcnVzZXIwggEi
+MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQChGUnirhdFKW6OXbPBqQ1tWEFrxvCHr51uVU9H
+V2aqO+Q02a+Vzyb24dzyqnbM5uOeGqAyTFXpCPOK0oxTCvf/0idmHIcgt40797I7rxWDJw9/wYos
+UGkqizAb878LaFScIo6Phu6zjdj/J16vd5KiWN5pzOLnwO8DebzO5s+N34VuNZ8s45zemq2bES9Z
+z8mMolTkZS4d8wGExC93n5oiNrPGUneKRZJYukv3SiDMajaOTqnI4Xo/LIs3dynq8dTBQPTtUwnA
+UZz8kpew6PfxDYYHjg2eHli/6Dopmur/R27xuxn5VnJHnxgL5mbxrRgAidGN6CwJFA7ZxSBn67pr
+AgMBAAGjITAfMB0GA1UdDgQWBBTxczVGKoS4NuNIPlS4yJfm8fSj3zANBgkqhkiG9w0BAQsFAAOC
+AQEAC4PSVAzUVGqhESIGDpJ6kbHzw/wBUmrjceTDQv9cVPNrHlMWoG67nM45tECWud3osB57nunV
+vcwSNXxhf4M+IPK1BoT2awUjEfWN+F7guxFXpU2lQpmHPj+015g9pGvvneRLZj8VfdFo8PuyDeRy
+V0HuG7xJ2xZMM8XpgL9BHrgD/4CITzRkaHnyuYb+Yz5GUFYOpLn0ANNm3gfW+eMiE/38zc+o23wJ
+V49hAKGqalJUATWVzq7iCqTqxeIQ2RQyJ9O5p82Y5CIG1Tp07zdCPVqkKz7NAbt2K0ZW5/5qc5V/
+y88rnXWj9nZPYwyVj5rxqB8h2WDLDmxr1JuwuMOlYw==
+-----END CERTIFICATE-----

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt
new file mode 100644
index 0000000..5cb6caa
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICvzCCAaegAwIBAgIEWaKEszANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDEwV1c2VyMTAeFw0x
+NTEyMTUyMzQyNTlaFw0xNjAzMTQyMzQyNTlaMBAxDjAMBgNVBAMTBXVzZXIxMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAgDzGn4VvJnROVCC+CR77DfqmF1wkNUrOiaLL9qufoRi9DuZU
+epmqebg0YyCQVyuIUe1p7qhnOGNnFN0nJC75C4MbCDX/s2+gxUBb6iaP7pwmdKzprvP3YGQrQXo/
+pv+zV9EH1P5JP+27B6NVGTGJPUP4UqZF2uyhNOHIcB9sMvZTnyfDLs+8o9dCv3bFPpwEGZnk3I1I
+xD1cYSz+qb3E3M68L6cFVSo1qnK0QN8eBXXB/ljCHaQ47jLfZrJjjiRKA1YOnY+sRCbQDv4wU+dc
+oOenLzLikrMdVyONokbkneS/LnwjmNev2i9I9NA0D3bZvJuN/DkuQ245iXgdnqOvJwIDAQABoyEw
+HzAdBgNVHQ4EFgQUfzocV1Og4CsGte7Ux4luCVA3TTYwDQYJKoZIhvcNAQELBQADggEBAEeemqwJ
+eY/GahjPesuyJKiIfH4MgMGvZ19441WnKG1CuHrtwMES8Znxc+iS4hutPX6I/Mvb9HMg8M3u9B7W
+1dj4QOvi5iZuWqrV2bhBrFoUV7fXPjjMfu15i/CX5Lfu56cBeyKshq674rQ4AWn1k5saxa6Jhaao
+6ceFfnTldgVSSS0rBFyz1fBj7dLXnS8MmxN0cmDO1jVXu2Tfjw0ofRmLxD1SCMEwrNEcERRUWudm
+nIy1Q14xCYmTnGEf9uG8TmHO/y5Elc/jcMN2mGwb8N0FIV7nh1HLyAmR6O7JPrQ3QWR4Vr5tMH/K
+3b9N51c0enX9UZedGYVc+qlLJ/P6B5w=
+-----END CERTIFICATE-----

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt
new file mode 100644
index 0000000..d0b0820
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICvzCCAaegAwIBAgIEC6qUijANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDEwV1c2VyMjAeFw0x
+NTEyMTUyMzQ0MjVaFw0xNjAzMTQyMzQ0MjVaMBAxDjAMBgNVBAMTBXVzZXIyMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhm2vitVj2xApz7ZtaWNcqegodc9nFY+HCcIx2WqoUzQTXZ8q
+Fm6H6blKrL+xJXY7ZlEB8nMdfWFfOdS2zX6hutkstkwId5MSceWUb5GUzdClUQAS8DGMtQdU3LlY
+EcIgz9fim6/Ad0ZIKwyAc47HJLd/nQOozAaDDnWdLbhRymv/PNEt5IndkeTfbFd1uWgpV9vhfLWN
+3FmXOksVoIKR+l9YBOmAUIjstK2Tq8b/q4Dbcp82X1nPW12fG2FlowgolWEOlaCbSGwN60LjoP69
+1azAFU5IPaxmQ46oZpb7jMCRrHgdx+zhjRxjY9PpTCYWdtBHqnLyuckl/mpOxS64vwIDAQABoyEw
+HzAdBgNVHQ4EFgQUHaTI3Xl/CjJLhVCZto5ZJBCTaLUwDQYJKoZIhvcNAQELBQADggEBAEg/SxvT
++NLmh7tWF0QZR2S6wl+UgJIqiS6NlEk3Te6TdPda2t2K8cmFndBcAmZqvLkz7dIkeDwa507SbrTg
+NJXcOycpH1s15VjiVRF8dXqflLCEcBUNw8h4AENsdVcNKliR+YXLk1i/x5jVfncQps6Zxj68NFoN
+h6tf7KyBHT4DvekYocjdXDQ/tPdvPqokYIM/q0K7NRZvDg6yUYukkFjta9D9623PwydtA/t75AEb
+zOJra5A6qp/qo/U1UyLzEkwSlWaLaOa7MrNaFy/OQbkVncP+6jFCIXlWpQ+TqyUmTfwmL+A2oJWW
+l3Ziy62zAfuaJ1EwY4zwFlZHJR4lF7E=
+-----END CERTIFICATE-----


Mime
View raw message