ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rm...@apache.org
Subject [4/6] incubator-ranger git commit: RANGER-586: Ranger plugins should not add dependent libraries to component's CLASSPATH
Date Thu, 22 Oct 2015 00:02:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
new file mode 100644
index 0000000..9389875
--- /dev/null
+++ b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.hadoop;
+
+import static org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.EXECUTE_ACCCESS_TYPE;
+import static org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.READ_ACCCESS_TYPE;
+import static org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.WRITE_ACCCESS_TYPE;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
+import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
+import org.apache.ranger.authorization.hadoop.exceptions.RangerAccessControlException;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResource;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import com.google.common.collect.Sets;
+
+public class RangerHdfsAuthorizerImpl extends INodeAttributeProvider {
+	private static final Log LOG = LogFactory.getLog(RangerHdfsAuthorizerImpl.class);
+
+	private RangerHdfsPlugin           rangerPlugin            = null;
+	private Map<FsAction, Set<String>> access2ActionListMapper = new HashMap<FsAction, Set<String>>();
+
+	public RangerHdfsAuthorizerImpl() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.RangerHdfsAuthorizer()");
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.RangerHdfsAuthorizer()");
+		}
+	}
+
+	public void start() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.start()");
+		}
+
+		RangerHdfsPlugin plugin = new RangerHdfsPlugin();
+		plugin.init();
+
+		access2ActionListMapper.put(FsAction.NONE,          new HashSet<String>());
+		access2ActionListMapper.put(FsAction.ALL,           Sets.newHashSet(READ_ACCCESS_TYPE, WRITE_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.READ,          Sets.newHashSet(READ_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.READ_WRITE,    Sets.newHashSet(READ_ACCCESS_TYPE, WRITE_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.READ_EXECUTE,  Sets.newHashSet(READ_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.WRITE,         Sets.newHashSet(WRITE_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.WRITE_EXECUTE, Sets.newHashSet(WRITE_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+		access2ActionListMapper.put(FsAction.EXECUTE,       Sets.newHashSet(EXECUTE_ACCCESS_TYPE));
+
+		rangerPlugin = plugin;
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.start()");
+		}
+	}
+
+	public void stop() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.stop()");
+		}
+
+		RangerHdfsPlugin plugin = rangerPlugin;
+		rangerPlugin = null;
+
+		if(plugin != null) {
+			plugin.cleanup();
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.stop()");
+		}
+	}
+
+	@Override
+	public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.getAttributes(" + fullPath + ")");
+		}
+
+		INodeAttributes ret = inode; // return default attributes
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.getAttributes(" + fullPath + "): " + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public INodeAttributes getAttributes(String[] pathElements, INodeAttributes inode) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.getAttributes(pathElementsCount=" + (pathElements == null ? 0 : pathElements.length) + ")");
+		}
+
+		INodeAttributes ret = inode; // return default attributes
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.getAttributes(pathElementsCount=" + (pathElements == null ? 0 : pathElements.length) + "): " + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public AccessControlEnforcer getExternalAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuthorizer.getExternalAccessControlEnforcer()");
+		}
+
+		RangerAccessControlEnforcer rangerAce = new RangerAccessControlEnforcer(defaultEnforcer);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuthorizer.getExternalAccessControlEnforcer()");
+		}
+
+		return rangerAce;
+	}
+
+
+	class RangerAccessControlEnforcer implements AccessControlEnforcer {
+		private INodeAttributeProvider.AccessControlEnforcer defaultEnforcer = null;
+
+		public RangerAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("==> RangerAccessControlEnforcer.RangerAccessControlEnforcer()");
+			}
+
+			this.defaultEnforcer = defaultEnforcer;
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("<== RangerAccessControlEnforcer.RangerAccessControlEnforcer()");
+			}
+		}
+
+		@Override
+		public void checkPermission(String fsOwner, String superGroup, UserGroupInformation ugi,
+									INodeAttributes[] inodeAttrs, INode[] inodes, byte[][] pathByNameArr,
+									int snapshotId, String path, int ancestorIndex, boolean doCheckOwner,
+									FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+									FsAction subAccess, boolean ignoreEmptyDir) throws AccessControlException {
+			boolean                accessGranted = false;
+			RangerHdfsPlugin       plugin        = rangerPlugin;
+			RangerHdfsAuditHandler auditHandler  = null;
+			String                 user          = ugi != null ? ugi.getShortUserName() : null;
+			Set<String>            groups        = ugi != null ? Sets.newHashSet(ugi.getGroupNames()) : null;
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("==> RangerAccessControlEnforcer.checkPermission("
+						+ "fsOwner=" + fsOwner + "; superGroup=" + superGroup + ", inodesCount=" + (inodes != null ? inodes.length : 0)
+						+ ", snapshotId=" + snapshotId + ", user=" + user + ", path=" + path + ", ancestorIndex=" + ancestorIndex
+						+ ", doCheckOwner="+ doCheckOwner + ", ancestorAccess=" + ancestorAccess + ", parentAccess=" + parentAccess
+						+ ", access=" + access + ", subAccess=" + subAccess + ", ignoreEmptyDir=" + ignoreEmptyDir + ")");
+			}
+
+			try {
+				if(plugin != null && !ArrayUtils.isEmpty(inodes)) {
+					auditHandler = new RangerHdfsAuditHandler(path);
+
+					if(ancestorIndex >= inodes.length) {
+						ancestorIndex = inodes.length - 1;
+					}
+
+					for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null; ancestorIndex--);
+
+					accessGranted = true;
+
+					INode ancestor = inodes.length > ancestorIndex && ancestorIndex >= 0 ? inodes[ancestorIndex] : null;
+					INode parent   = inodes.length > 1 ? inodes[inodes.length - 2] : null;
+					INode inode    = inodes[inodes.length - 1];
+
+					boolean noAccessToCheck = access == null && parentAccess == null && ancestorAccess == null && subAccess == null;
+
+					if(noAccessToCheck) { // check for traverse (EXECUTE) access on the path (if path is a directory) or its parent (if path is a file)
+						INode           node        = null;
+						INodeAttributes nodeAttribs = null;
+
+						if(inode != null && inode.isDirectory()) {
+							node        = inode;
+							nodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+						} else if(parent != null) {
+							node        = parent;
+							nodeAttribs = inodeAttrs.length > 1 ? inodeAttrs[inodeAttrs.length - 2] : null;
+						}
+
+						if(node != null) {
+							accessGranted = isAccessAllowed(node, nodeAttribs, FsAction.EXECUTE, user, groups, fsOwner, superGroup, plugin, null);
+						}
+					}
+
+					// checkStickyBit
+					if (accessGranted && parentAccess != null && parentAccess.implies(FsAction.WRITE) && parent != null && inode != null) {
+						if (parent.getFsPermission() != null && parent.getFsPermission().getStickyBit()) {
+						    // user should be owner of the parent or the inode
+						    accessGranted = StringUtils.equals(parent.getUserName(), user) || StringUtils.equals(inode.getUserName(), user);
+						}
+					}
+
+					// checkAncestorAccess
+					if(accessGranted && ancestorAccess != null && ancestor != null) {
+						INodeAttributes ancestorAttribs = inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
+
+						accessGranted = isAccessAllowed(ancestor, ancestorAttribs, ancestorAccess, user, groups, fsOwner, superGroup, plugin, auditHandler);
+					}
+
+					// checkParentAccess
+					if(accessGranted && parentAccess != null && parent != null) {
+						INodeAttributes parentAttribs = inodeAttrs.length > 1 ? inodeAttrs[inodeAttrs.length - 2] : null;
+
+						accessGranted = isAccessAllowed(parent, parentAttribs, parentAccess, user, groups, fsOwner, superGroup, plugin, auditHandler);
+					}
+
+					// checkINodeAccess
+					if(accessGranted && access != null && inode != null) {
+						INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+
+						accessGranted = isAccessAllowed(inode, inodeAttribs, access, user, groups, fsOwner, superGroup, plugin, auditHandler);
+					}
+
+					// checkSubAccess
+					if(accessGranted && subAccess != null && inode != null && inode.isDirectory()) {
+						Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
+
+						for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
+							INodeDirectory      dir   = directories.pop();
+							ReadOnlyList<INode> cList = dir.getChildrenList(snapshotId);
+
+							if (!(cList.isEmpty() && ignoreEmptyDir)) {
+								INodeAttributes dirAttribs = dir.getSnapshotINode(snapshotId);
+
+								accessGranted = isAccessAllowed(dir, dirAttribs, subAccess, user, groups, fsOwner, superGroup, plugin, auditHandler);
+
+								if(! accessGranted) {
+									break;
+								}
+							}
+
+							for(INode child : cList) {
+								if (child.isDirectory()) {
+									directories.push(child.asDirectory());
+								}
+							}
+						}
+					}
+
+					// checkOwnerAccess
+					if(accessGranted && doCheckOwner) {
+						INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+						String          owner        = inodeAttribs != null ? inodeAttribs.getUserName() : null;
+
+						accessGranted = StringUtils.equals(user, owner);
+					}
+				}
+
+				if(! accessGranted && RangerHdfsPlugin.isHadoopAuthEnabled() && defaultEnforcer != null) {
+					try {
+						defaultEnforcer.checkPermission(fsOwner, superGroup, ugi, inodeAttrs, inodes,
+														pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
+														ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+
+						accessGranted = true;
+					} finally {
+						if(auditHandler != null) {
+							FsAction action = access;
+
+							if(action == null) {
+								if(parentAccess != null) {
+									action = parentAccess;
+								} else if(ancestorAccess != null) {
+									action = ancestorAccess;
+								} else if(subAccess != null) {
+									action = subAccess;
+								} else {
+									action = FsAction.NONE;
+								}
+							}
+
+							auditHandler.logHadoopEvent(path, action, accessGranted);
+						}
+					}
+				}
+
+				if(! accessGranted) {
+					throw new RangerAccessControlException("Permission denied: principal{user=" + user + ",groups: " + groups + "}, access=" + access + ", " + path) ;
+				}
+			} finally {
+				if(auditHandler != null) {
+					auditHandler.flushAudit();
+				}
+
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("<== RangerAccessControlEnforcer.checkPermission(" + path + ", " + access + ", user=" + user + ") : " + accessGranted);
+				}
+			}
+		}
+
+		private boolean isAccessAllowed(INode inode, INodeAttributes inodeAttribs, FsAction access, String user, Set<String> groups, String fsOwner, String superGroup, RangerHdfsPlugin plugin, RangerHdfsAuditHandler auditHandler) {
+			boolean ret       = false;
+			String  path      = inode != null ? inode.getFullPathName() : null;
+			String  pathOwner = inodeAttribs != null ? inodeAttribs.getUserName() : null;
+
+			if(pathOwner == null && inode != null) {
+				pathOwner = inode.getUserName();
+			}
+
+			if (RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH_ALT.equals(path)) {
+				path = RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH;
+			}
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("==> RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + ")");
+			}
+
+			Set<String> accessTypes = access2ActionListMapper.get(access);
+
+			if(accessTypes == null) {
+				LOG.warn("RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + "): no Ranger accessType found for " + access);
+
+				accessTypes = access2ActionListMapper.get(FsAction.NONE);
+			}
+
+			for(String accessType : accessTypes) {
+				RangerHdfsAccessRequest request = new RangerHdfsAccessRequest(path, pathOwner, access, accessType, user, groups);
+
+				RangerAccessResult result = plugin.isAccessAllowed(request, auditHandler);
+
+				if (result == null) {
+					LOG.error("RangerAccessControlEnforcer: Internal error: null RangerAccessResult object received back from isAccessAllowed()!");
+				} else {
+					ret = result.getIsAllowed();
+
+					if (!ret) {
+						break;
+					}
+				}
+			}
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("<== RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + "): " + ret);
+			}
+
+			return ret;
+		}
+	}
+}
+
+
+class RangerHdfsPlugin extends RangerBasePlugin {
+	private static boolean hadoopAuthEnabled = RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT;
+
+	public RangerHdfsPlugin() {
+		super("hdfs", "hdfs");
+	}
+	
+	public void init() {
+		super.init();
+		
+		RangerHdfsPlugin.hadoopAuthEnabled = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_PROP, RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT);
+	}
+
+	public static boolean isHadoopAuthEnabled() {
+		return RangerHdfsPlugin.hadoopAuthEnabled;
+	}
+}
+
+class RangerHdfsResource extends RangerAccessResourceImpl {
+	private static final String KEY_PATH = "path";
+
+
+	public RangerHdfsResource(String path, String owner) {
+		super.setValue(KEY_PATH, path);
+		super.setOwnerUser(owner);
+	}
+}
+
+class RangerHdfsAccessRequest extends RangerAccessRequestImpl {
+	public RangerHdfsAccessRequest(String path, String pathOwner, FsAction access, String accessType, String user, Set<String> groups) {
+		super.setResource(new RangerHdfsResource(path, pathOwner));
+		super.setAccessType(accessType);
+		super.setUser(user);
+		super.setUserGroups(groups);
+		super.setAccessTime(StringUtil.getUTCDate());
+		super.setClientIPAddress(getRemoteIp());
+		super.setAction(access.toString());
+	}
+	
+	private static String getRemoteIp() {
+		String ret = null ;
+		InetAddress ip = Server.getRemoteIp() ;
+		if (ip != null) {
+			ret = ip.getHostAddress();
+		}
+		return ret ;
+	}
+}
+
+class RangerHdfsAuditHandler extends RangerDefaultAuditHandler {
+	private static final Log LOG = LogFactory.getLog(RangerHdfsAuditHandler.class);
+
+	private boolean         isAuditEnabled = false;
+	private AuthzAuditEvent auditEvent     = null;
+
+	private static final String    RangerModuleName = RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_RANGER_MODULE_ACL_NAME_PROP , RangerHadoopConstants.DEFAULT_RANGER_MODULE_ACL_NAME) ;
+	private static final String    HadoopModuleName = RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_HADOOP_MODULE_ACL_NAME_PROP , RangerHadoopConstants.DEFAULT_HADOOP_MODULE_ACL_NAME) ;
+	private static final String    excludeUserList  = RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_HDFS_EXCLUDE_LIST_PROP, RangerHadoopConstants.AUDITLOG_EMPTY_STRING) ;
+	private static HashSet<String> excludeUsers     = null ;
+
+	static {
+		if (excludeUserList != null && excludeUserList.trim().length() > 0) {
+			excludeUsers = new HashSet<String>() ;
+			for(String excludeUser : excludeUserList.trim().split(",")) {
+				excludeUser = excludeUser.trim() ;
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Adding exclude user [" + excludeUser + "]");
+				}
+				excludeUsers.add(excludeUser) ;
+				}
+		}
+	}
+
+	public RangerHdfsAuditHandler(String pathToBeValidated) {
+		auditEvent = new AuthzAuditEvent();
+		auditEvent.setResourcePath(pathToBeValidated);
+	}
+
+	@Override
+	public void processResult(RangerAccessResult result) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuditHandler.logAudit(" + result + ")");
+		}
+
+		if(! isAuditEnabled && result.getIsAudited()) {
+			isAuditEnabled = true;
+		}
+
+		RangerAccessRequest  request      = result.getAccessRequest();
+//		RangerServiceDef     serviceDef   = result.getServiceDef();
+		RangerAccessResource resource     = request.getResource();
+		String               resourceType = resource != null ? resource.getLeafName() : null;
+		String               resourcePath = resource != null ? resource.getAsString() : null;
+
+		auditEvent.setUser(request.getUser());
+		auditEvent.setResourceType(resourceType) ;
+		auditEvent.setAccessType(request.getAction());
+		auditEvent.setAccessResult((short)(result.getIsAllowed() ? 1 : 0));
+		auditEvent.setClientIP(request.getClientIPAddress());
+		auditEvent.setEventTime(request.getAccessTime());
+		auditEvent.setAclEnforcer(RangerModuleName);
+		auditEvent.setPolicyId(result.getPolicyId());
+		auditEvent.setRepositoryType(result.getServiceType());
+		auditEvent.setRepositoryName(result.getServiceName());
+		auditEvent.setResultReason(resourcePath);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuditHandler.logAudit(" + result + "): " + auditEvent);
+		}
+	}
+
+	public void logHadoopEvent(String path, FsAction action, boolean accessGranted) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuditHandler.logHadoopEvent(" + path + ", " + action + ", " + accessGranted + ")");
+		}
+
+		auditEvent.setResultReason(path);
+		auditEvent.setAccessResult((short) (accessGranted ? 1 : 0));
+		auditEvent.setAccessType(action == null ? null : action.toString());
+		auditEvent.setAclEnforcer(HadoopModuleName);
+		auditEvent.setPolicyId(-1);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuditHandler.logHadoopEvent(" + path + ", " + action + ", " + accessGranted + "): " + auditEvent);
+		}
+	}
+
+	public void flushAudit() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerHdfsAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
+		}
+
+		if(isAuditEnabled && !StringUtils.isEmpty(auditEvent.getAccessType())) {
+			String username = auditEvent.getUser();
+
+			boolean skipLog = (username != null && excludeUsers != null && excludeUsers.contains(username)) ;
+
+			if (! skipLog) {
+				super.logAuthzAudit(auditEvent);
+			}
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerHdfsAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/conf/ranger-hive-security.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/ranger-hive-security.xml b/hive-agent/conf/ranger-hive-security.xml
index 3a5fc54..515b1fd 100644
--- a/hive-agent/conf/ranger-hive-security.xml
+++ b/hive-agent/conf/ranger-hive-security.xml
@@ -65,6 +65,22 @@
 		</description>
 	</property>
 
+		<property>
+		<name>ranger.policy.rest.client.connection.timeoutMs</name>
+		<value>120000</value>
+		<description>
+			RangerRestClient Connection Timeout in Milli Seconds
+		</description>
+	</property>
+	
+	<property>
+		<name>ranger.policy.rest.client.read.timeoutMs</name>
+		<value>30000</value>
+		<description>
+			RangerRestClient read Timeout in Milli Seconds
+		</description>
+	</property>
+	
 	<property>
 		<name>xasecure.hive.update.xapolicies.on.grant.revoke</name>
 		<value>true</value>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
----------------------------------------------------------------------
diff --git a/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java b/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
index e941704..079616c 100644
--- a/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
+++ b/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
@@ -18,7 +18,7 @@
  */
 package com.xasecure.authorization.hive.authorizer;
 
-import org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory;
+import org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactoryImpl;
 
 /**
  * This class exists only to provide for seamless upgrade/downgrade capabilities.  Coprocessor name is in hbase config files in /etc/.../conf which
@@ -28,6 +28,6 @@ import org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFacto
  * This class is final because if one needs to customize coprocessor it is expected that RangerAuthorizationCoprocessor would be modified/extended as that is
  * the "real" coprocessor!  This class, hence, should NEVER be more than an EMPTY shell!
  */
-public final class XaSecureHiveAuthorizerFactory extends RangerHiveAuthorizerFactory {
+public final class XaSecureHiveAuthorizerFactory extends RangerHiveAuthorizerFactoryImpl {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
----------------------------------------------------------------------
diff --git a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
deleted file mode 100644
index bd410b7..0000000
--- a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package org.apache.ranger.authorization.hive.authorizer;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
-
-public class RangerHiveAuthorizerFactory implements HiveAuthorizerFactory {
-	@Override
-	public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreClientFactory,
-											   HiveConf                   conf,
-											   HiveAuthenticationProvider hiveAuthenticator,
-											   HiveAuthzSessionContext    sessionContext)
-													   throws HiveAuthzPluginException {
-		return new RangerHiveAuthorizer(metastoreClientFactory, conf, hiveAuthenticator, sessionContext);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
new file mode 100644
index 0000000..937aaed
--- /dev/null
+++ b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.ranger.authorization.hive.authorizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+
+public class RangerHiveAuthorizerFactoryImpl implements HiveAuthorizerFactory {
+	
+	@Override
+	public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreClientFactory,
+											   HiveConf                   conf,
+											   HiveAuthenticationProvider hiveAuthenticator,
+											   HiveAuthzSessionContext    sessionContext)
+													   throws HiveAuthzPluginException {
+		return new RangerHiveAuthorizer(metastoreClientFactory, conf, hiveAuthenticator, sessionContext);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
----------------------------------------------------------------------
diff --git a/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java b/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
deleted file mode 100644
index 6b9d6fd..0000000
--- a/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.xasecure.pdp.knox.filter;
-
-import org.apache.ranger.authorization.knox.RangerPDPKnoxFilter;
-
-public class XASecurePDPKnoxFilter extends RangerPDPKnoxFilter {
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/conf/ranger-kafka-security.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/conf/ranger-kafka-security.xml b/plugin-kafka/conf/ranger-kafka-security.xml
index 2c06f5c..f9c8d5f 100644
--- a/plugin-kafka/conf/ranger-kafka-security.xml
+++ b/plugin-kafka/conf/ranger-kafka-security.xml
@@ -80,4 +80,5 @@
 			RangerRestClient read Timeout in Milli Seconds
 		</description>
 	</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index afee47d..e14e48c 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -47,5 +47,10 @@
 			<artifactId>kafka_2.10</artifactId>
 			<version>${kafka.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+   		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
deleted file mode 100644
index dbb2723..0000000
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.authorization.kafka.authorizer;
-
-import java.io.IOException;
-import java.security.Principal;
-import java.util.Date;
-
-import javax.security.auth.Subject;
-
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
-import kafka.security.auth.ResourceType;
-import kafka.server.KafkaConfig;
-import kafka.common.security.LoginManager;
-import kafka.network.RequestChannel.Session;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.authorization.utils.StringUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
-
-import scala.collection.immutable.HashSet;
-import scala.collection.immutable.Set;
-
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Log logger = LogFactory
-			.getLog(RangerKafkaAuthorizer.class);
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumer_group";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	long lastLogTime = 0;
-	int errorLogFreq = 30000; // Log after every 30 seconds
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
-	 */
-	@Override
-	public void initialize(KafkaConfig kafkaConfig) {
-
-		if (rangerPlugin == null) {
-			try {
-				Subject subject = LoginManager.subject();
-				UserGroupInformation ugi = MiscUtil
-						.createUGIFromSubject(subject);
-				if (ugi != null) {
-					MiscUtil.setUGILoginUser(ugi, subject);
-				}
-				logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-			} catch (Throwable t) {
-				logger.error("Error getting principal.", t);
-			}
-
-			rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-			logger.info("Calling plugin.init()");
-			rangerPlugin.init();
-
-			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
-			rangerPlugin.setResultProcessor(auditHandler);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		// TODO: If resource type if consumer group, then allow it by default
-		if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
-			return true;
-		}
-
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-			userName = StringUtils.substringBefore(userName, "/");
-			userName = StringUtils.substringBefore(userName, "@");
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.host();
-
-		Date eventTime = StringUtil.getUTCDate();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.fatal("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(ResourceType.TOPIC)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
-			// CLUSTER should go as null
-			// rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else {
-			logger.fatal("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = true;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-			returnValue = false;
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-					returnValue = false;
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			}
-		}
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls() is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls() is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls() is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public Set<Acl> getAcls(KafkaPrincipal principal) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Operation.READ)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Operation.WRITE)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Operation.ALTER)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Operation.DESCRIBE)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(Operation.CLUSTER_ACTION)) {
-			return ACCESS_TYPE_KAFKA_ADMIN;
-		}
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
new file mode 100644
index 0000000..608371a
--- /dev/null
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import java.util.Date;
+import javax.security.auth.Subject;
+
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.KafkaPrincipal;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType;
+import kafka.server.KafkaConfig;
+import kafka.common.security.LoginManager;
+import kafka.network.RequestChannel.Session;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import scala.collection.immutable.HashSet;
+import scala.collection.immutable.Set;
+
+public class RangerKafkaAuthorizerImpl implements Authorizer {
+	private static final Log logger = LogFactory
+			.getLog(RangerKafkaAuthorizerImpl.class);
+
+	public static final String KEY_TOPIC = "topic";
+	public static final String KEY_CLUSTER = "cluster";
+	public static final String KEY_CONSUMER_GROUP = "consumer_group";
+
+	public static final String ACCESS_TYPE_READ = "consume";
+	public static final String ACCESS_TYPE_WRITE = "publish";
+	public static final String ACCESS_TYPE_CREATE = "create";
+	public static final String ACCESS_TYPE_DELETE = "delete";
+	public static final String ACCESS_TYPE_CONFIGURE = "configure";
+	public static final String ACCESS_TYPE_DESCRIBE = "describe";
+	public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+
+	private static volatile RangerBasePlugin rangerPlugin = null;
+	long lastLogTime = 0;
+	int errorLogFreq = 30000; // Log after every 30 seconds
+
+	public RangerKafkaAuthorizerImpl() {
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+	 */
+	@Override
+	public void initialize(KafkaConfig kafkaConfig) {
+
+		if (rangerPlugin == null) {
+			try {
+				Subject subject = LoginManager.subject();
+				UserGroupInformation ugi = MiscUtil
+						.createUGIFromSubject(subject);
+				if (ugi != null) {
+					MiscUtil.setUGILoginUser(ugi, subject);
+				}
+				logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
+			} catch (Throwable t) {
+				logger.error("Error getting principal.", t);
+			}
+
+			rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+			logger.info("Calling plugin.init()");
+			rangerPlugin.init();
+
+			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+			rangerPlugin.setResultProcessor(auditHandler);
+		}
+	}
+
+	@Override
+	public boolean authorize(Session session, Operation operation, Resource resource) {
+
+		if (rangerPlugin == null) {
+			MiscUtil.logErrorMessageByInterval(logger,
+					"Authorizer is still not initialized");
+			return false;
+		}
+
+		// TODO: If resource type if consumer group, then allow it by default
+		if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+			return true;
+		}
+
+		String userName = null;
+		if (session.principal() != null) {
+			userName = session.principal().getName();
+			userName = StringUtils.substringBefore(userName, "/");
+			userName = StringUtils.substringBefore(userName, "@");
+		}
+		java.util.Set<String> userGroups = MiscUtil
+				.getGroupsForRequestUser(userName);
+		String ip = session.host();
+
+		Date eventTime = StringUtil.getUTCDate();
+		String accessType = mapToRangerAccessType(operation);
+		boolean validationFailed = false;
+		String validationStr = "";
+
+		if (accessType == null) {
+			if (MiscUtil.logErrorMessageByInterval(logger,
+					"Unsupported access type. operation=" + operation)) {
+				logger.fatal("Unsupported access type. session=" + session
+						+ ", operation=" + operation + ", resource=" + resource);
+			}
+			validationFailed = true;
+			validationStr += "Unsupported access type. operation=" + operation;
+		}
+		String action = accessType;
+
+		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+		rangerRequest.setUser(userName);
+		rangerRequest.setUserGroups(userGroups);
+		rangerRequest.setClientIPAddress(ip);
+		rangerRequest.setAccessTime(eventTime);
+
+		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+		rangerRequest.setResource(rangerResource);
+		rangerRequest.setAccessType(accessType);
+		rangerRequest.setAction(action);
+		rangerRequest.setRequestData(resource.name());
+
+		if (resource.resourceType().equals(ResourceType.TOPIC)) {
+			rangerResource.setValue(KEY_TOPIC, resource.name());
+		} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
+			// CLUSTER should go as null
+			// rangerResource.setValue(KEY_CLUSTER, resource.name());
+		} else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
+		} else {
+			logger.fatal("Unsupported resourceType=" + resource.resourceType());
+			validationFailed = true;
+		}
+
+		boolean returnValue = true;
+		if (validationFailed) {
+			MiscUtil.logErrorMessageByInterval(logger, validationStr
+					+ ", request=" + rangerRequest);
+			returnValue = false;
+		} else {
+
+			try {
+				RangerAccessResult result = rangerPlugin
+						.isAccessAllowed(rangerRequest);
+				if (result == null) {
+					logger.error("Ranger Plugin returned null. Returning false");
+					returnValue = false;
+				} else {
+					returnValue = result.getIsAllowed();
+				}
+			} catch (Throwable t) {
+				logger.error("Error while calling isAccessAllowed(). request="
+						+ rangerRequest, t);
+			}
+		}
+		if (logger.isDebugEnabled()) {
+			logger.debug("rangerRequest=" + rangerRequest + ", return="
+					+ returnValue);
+		}
+		return returnValue;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
+	 * kafka.security.auth.Resource)
+	 */
+	@Override
+	public void addAcls(Set<Acl> acls, Resource resource) {
+		logger.error("addAcls() is not supported by Ranger for Kafka");
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
+	 * kafka.security.auth.Resource)
+	 */
+	@Override
+	public boolean removeAcls(Set<Acl> acls, Resource resource) {
+		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		return false;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
+	 */
+	@Override
+	public boolean removeAcls(Resource resource) {
+		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		return false;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
+	 */
+	@Override
+	public Set<Acl> getAcls(Resource resource) {
+		Set<Acl> aclList = new HashSet<Acl>();
+		logger.error("getAcls() is not supported by Ranger for Kafka");
+
+		return aclList;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+	 * )
+	 */
+	@Override
+	public Set<Acl> getAcls(KafkaPrincipal principal) {
+		Set<Acl> aclList = new HashSet<Acl>();
+		logger.error("getAcls() is not supported by Ranger for Kafka");
+		return aclList;
+	}
+
+	/**
+	 * @param operation
+	 * @return
+	 */
+	private String mapToRangerAccessType(Operation operation) {
+		if (operation.equals(Operation.READ)) {
+			return ACCESS_TYPE_READ;
+		} else if (operation.equals(Operation.WRITE)) {
+			return ACCESS_TYPE_WRITE;
+		} else if (operation.equals(Operation.ALTER)) {
+			return ACCESS_TYPE_CONFIGURE;
+		} else if (operation.equals(Operation.DESCRIBE)) {
+			return ACCESS_TYPE_DESCRIBE;
+		} else if (operation.equals(Operation.CLUSTER_ACTION)) {
+			return ACCESS_TYPE_KAFKA_ADMIN;
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kms/conf/ranger-kms-security.xml
----------------------------------------------------------------------
diff --git a/plugin-kms/conf/ranger-kms-security.xml b/plugin-kms/conf/ranger-kms-security.xml
index a22e6cb..58f7076 100755
--- a/plugin-kms/conf/ranger-kms-security.xml
+++ b/plugin-kms/conf/ranger-kms-security.xml
@@ -72,12 +72,13 @@
 			RangerRestClient Connection Timeout in Milli Seconds
 		</description>
 	</property>
-	
+
 	<property>
 		<name>ranger.plugin.kms.policy.rest.client.read.timeoutMs</name>
 		<value>30000</value>
 		<description>
 			RangerRestClient read Timeout in Milli Seconds
 		</description>
-	</property>	
+	</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
deleted file mode 100644
index ab9b7a9..0000000
--- a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
+++ /dev/null
@@ -1,354 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.authorization.yarn.authorizer;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.security.*;
-import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
-import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
-import org.apache.ranger.authorization.utils.StringUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
-
-import com.google.common.collect.Sets;
-
-public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
-	public static final String ACCESS_TYPE_ADMIN_QUEUE = "admin-queue";
-	public static final String ACCESS_TYPE_SUBMIT_APP  = "submit-app";
-	public static final String ACCESS_TYPE_ADMIN       = "admin";
-
-	private static boolean yarnAuthEnabled = RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT;
-
-	private static final Log LOG = LogFactory.getLog(RangerYarnAuthorizer.class);
-
-	private static volatile RangerYarnPlugin yarnPlugin = null;
-
-	private AccessControlList admins = null;
-	private Map<PrivilegedEntity, Map<AccessType, AccessControlList>> yarnAcl = new HashMap<PrivilegedEntity, Map<AccessType, AccessControlList>>();
-
-	@Override
-	public void init(Configuration conf) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.init()");
-		}
-
-		RangerYarnPlugin plugin = yarnPlugin;
-
-		if(plugin == null) {
-			synchronized(RangerYarnAuthorizer.class) {
-				plugin = yarnPlugin;
-
-				if(plugin == null) {
-					plugin = new RangerYarnPlugin();
-					plugin.init();
-					
-					yarnPlugin = plugin;
-				}
-			}
-		}
-
-		RangerYarnAuthorizer.yarnAuthEnabled = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_PROP, RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT);
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public boolean checkPermission(AccessType accessType, PrivilegedEntity entity, UserGroupInformation ugi) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.checkPermission(" + accessType + ", " + toString(entity) + ", " + ugi + ")");
-		}
-
-		boolean                ret          = false;
-		RangerYarnPlugin       plugin       = yarnPlugin;
-		RangerYarnAuditHandler auditHandler = null;
-		RangerAccessResult     result       = null;
-
-		if(plugin != null) {
-			RangerYarnAccessRequest request = new RangerYarnAccessRequest(entity, getRangerAccessType(accessType), accessType.name(), ugi);
-
-			auditHandler = new RangerYarnAuditHandler();
-
-			result = plugin.isAccessAllowed(request, auditHandler);
-		}
-
-		if(RangerYarnAuthorizer.yarnAuthEnabled && (result == null || !result.getIsAccessDetermined())) {
-			ret = isAllowedByYarnAcl(accessType, entity, ugi, auditHandler);
-		} else {
-			ret = result == null ? false : result.getIsAllowed();
-		}
-
-		if(auditHandler != null) {
-			auditHandler.flushAudit();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.checkPermission(" + accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
-		}
-
-		return ret;
-	}
-
-	@Override
-	public boolean isAdmin(UserGroupInformation ugi) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.isAdmin(" + ugi + ")");
-		}
-
-		boolean ret = false;
-		
-		AccessControlList admins = this.admins;
-
-		if(admins != null) {
-			ret = admins.isUserAllowed(ugi);
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.isAdmin(" + ugi + "): " + ret);
-		}
-
-		return ret;
-	}
-
-	@Override
-	public void setAdmins(AccessControlList acl, UserGroupInformation ugi) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.setAdmins(" + acl + ", " + ugi + ")");
-		}
-
-		admins = acl;
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.setAdmins(" + acl + ", " + ugi + ")");
-		}
-	}
-
-	@Override
-	public void setPermission(PrivilegedEntity entity, Map<AccessType, AccessControlList> permission, UserGroupInformation ugi) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.setPermission(" + toString(entity) + ", " + permission + ", " + ugi + ")");
-		}
-
-		yarnAcl.put(entity, permission);
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.setPermission(" + toString(entity) + ", " + permission + ", " + ugi + ")");
-		}
-	}
-
-	public boolean isAllowedByYarnAcl(AccessType accessType, PrivilegedEntity entity, UserGroupInformation ugi, RangerYarnAuditHandler auditHandler) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + toString(entity) + ", " + ugi + ")");
-		}
-
-		boolean ret = false;
-
-		for(Map.Entry<PrivilegedEntity, Map<AccessType, AccessControlList>> e : yarnAcl.entrySet()) {
-			PrivilegedEntity                   aclEntity         = e.getKey();
-			Map<AccessType, AccessControlList> entityPermissions = e.getValue();
-
-			AccessControlList acl = entityPermissions == null ? null : entityPermissions.get(accessType);
-
-			if(acl == null || !acl.isUserAllowed(ugi)) {
-				continue;
-			}
-
-			if(! isSelfOrChildOf(entity, aclEntity)) {
-				continue;
-			}
-
-			ret = true;
-
-			break;
-		}
-
-		if(auditHandler != null) {
-			auditHandler.logYarnAclEvent(ret);
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
-		}
-
-		return ret;
-	}
-
-	private static String getRangerAccessType(AccessType accessType) {
-		String ret = null;
-
-		switch(accessType) {
-			case ADMINISTER_QUEUE:
-				ret = RangerYarnAuthorizer.ACCESS_TYPE_ADMIN_QUEUE;
-			break;
-
-			case SUBMIT_APP:
-				ret = RangerYarnAuthorizer.ACCESS_TYPE_SUBMIT_APP;
-			break;
-		}
-
-		return ret;
-	}
-
-	private boolean isSelfOrChildOf(PrivilegedEntity queue, PrivilegedEntity parentQueue) {
-		boolean ret = queue.equals(parentQueue);
-
-		if(!ret && queue.getType() == EntityType.QUEUE) {
-			String queueName       = queue.getName();
-			String parentQueueName = parentQueue.getName();
-
-			if(queueName.contains(".") && !StringUtil.isEmpty(parentQueueName)) {
-				if(parentQueueName.charAt(parentQueueName.length() - 1) != '.') {
-					parentQueueName += ".";
-				}
-
-				ret = queueName.startsWith(parentQueueName);
-			}
-		}
-
-		return ret;
-	}
-
-	private String toString(PrivilegedEntity entity) {
-		if(entity != null) {
-			return "{name=" + entity.getName() + "; type=" + entity.getType() + "}";
-		}
-
-		return "null";
-	}
-}
-
-class RangerYarnPlugin extends RangerBasePlugin {
-	public RangerYarnPlugin() {
-		super("yarn", "yarn");
-	}
-
-	@Override
-	public void init() {
-		super.init();
-
-		RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
-
-		super.setResultProcessor(auditHandler);
-	}
-}
-
-class RangerYarnResource extends RangerAccessResourceImpl {
-	private static final String KEY_QUEUE = "queue";
-
-	public RangerYarnResource(PrivilegedEntity entity) {
-		setValue(KEY_QUEUE, entity != null ? entity.getName() : null);
-	}
-}
-
-class RangerYarnAccessRequest extends RangerAccessRequestImpl {
-	public RangerYarnAccessRequest(PrivilegedEntity entity, String accessType, String action, UserGroupInformation ugi) {
-		super.setResource(new RangerYarnResource(entity));
-		super.setAccessType(accessType);
-		super.setUser(ugi.getShortUserName());
-		super.setUserGroups(Sets.newHashSet(ugi.getGroupNames()));
-		super.setAccessTime(StringUtil.getUTCDate());
-		super.setClientIPAddress(getRemoteIp());
-		super.setAction(accessType);
-	}
-	
-	private static String getRemoteIp() {
-		String ret = null ;
-		InetAddress ip = Server.getRemoteIp() ;
-		if (ip != null) {
-			ret = ip.getHostAddress();
-		}
-		return ret ;
-	}
-}
-
-class RangerYarnAuditHandler extends RangerDefaultAuditHandler {
-	private static final Log LOG = LogFactory.getLog(RangerYarnAuditHandler.class);
-
-	private static final String YarnModuleName = RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_YARN_MODULE_ACL_NAME_PROP , RangerHadoopConstants.DEFAULT_YARN_MODULE_ACL_NAME) ;
-
-	private boolean         isAuditEnabled = false;
-	private AuthzAuditEvent auditEvent     = null;
-
-	public RangerYarnAuditHandler() {
-	}
-
-	@Override
-	public void processResult(RangerAccessResult result) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuditHandler.logAudit(" + result + ")");
-		}
-
-		if(! isAuditEnabled && result.getIsAudited()) {
-			isAuditEnabled = true;
-		}
-
-		auditEvent = super.getAuthzEvents(result);
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuditHandler.logAudit(" + result + "): " + auditEvent);
-		}
-	}
-
-	public void logYarnAclEvent(boolean accessGranted) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuditHandler.logYarnAclEvent(" + accessGranted + ")");
-		}
-
-		if(auditEvent != null) {
-			auditEvent.setAccessResult((short) (accessGranted ? 1 : 0));
-			auditEvent.setAclEnforcer(YarnModuleName);
-			auditEvent.setPolicyId(-1);
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuditHandler.logYarnAclEvent(" + accessGranted + "): " + auditEvent);
-		}
-	}
-
-	public void flushAudit() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerYarnAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
-		}
-
-		if(isAuditEnabled) {
-			super.logAuthzAudit(auditEvent);
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerYarnAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
new file mode 100644
index 0000000..d154539
--- /dev/null
+++ b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
@@ -0,0 +1,354 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.yarn.authorizer;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.security.*;
+import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
+import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import com.google.common.collect.Sets;
+
+public class RangerYarnAuthorizerImpl extends YarnAuthorizationProvider {
+	public static final String ACCESS_TYPE_ADMIN_QUEUE = "admin-queue";
+	public static final String ACCESS_TYPE_SUBMIT_APP  = "submit-app";
+	public static final String ACCESS_TYPE_ADMIN       = "admin";
+
+	private static boolean yarnAuthEnabled = RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT;
+
+	private static final Log LOG = LogFactory.getLog(RangerYarnAuthorizerImpl.class);
+
+	private static volatile RangerYarnPlugin yarnPlugin = null;
+
+	private AccessControlList admins = null;
+	private Map<PrivilegedEntity, Map<AccessType, AccessControlList>> yarnAcl = new HashMap<PrivilegedEntity, Map<AccessType, AccessControlList>>();
+
+	@Override
+	public void init(Configuration conf) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.init()");
+		}
+
+		RangerYarnPlugin plugin = yarnPlugin;
+
+		if(plugin == null) {
+			synchronized(RangerYarnAuthorizerImpl.class) {
+				plugin = yarnPlugin;
+
+				if(plugin == null) {
+					plugin = new RangerYarnPlugin();
+					plugin.init();
+					
+					yarnPlugin = plugin;
+				}
+			}
+		}
+
+		RangerYarnAuthorizerImpl.yarnAuthEnabled = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_PROP, RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.init()");
+		}
+	}
+
+	@Override
+	public boolean checkPermission(AccessType accessType, PrivilegedEntity entity, UserGroupInformation ugi) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.checkPermission(" + accessType + ", " + toString(entity) + ", " + ugi + ")");
+		}
+
+		boolean                ret          = false;
+		RangerYarnPlugin       plugin       = yarnPlugin;
+		RangerYarnAuditHandler auditHandler = null;
+		RangerAccessResult     result       = null;
+
+		if(plugin != null) {
+			RangerYarnAccessRequest request = new RangerYarnAccessRequest(entity, getRangerAccessType(accessType), accessType.name(), ugi);
+
+			auditHandler = new RangerYarnAuditHandler();
+
+			result = plugin.isAccessAllowed(request, auditHandler);
+		}
+
+		if(RangerYarnAuthorizerImpl.yarnAuthEnabled && (result == null || !result.getIsAccessDetermined())) {
+			ret = isAllowedByYarnAcl(accessType, entity, ugi, auditHandler);
+		} else {
+			ret = result == null ? false : result.getIsAllowed();
+		}
+
+		if(auditHandler != null) {
+			auditHandler.flushAudit();
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.checkPermission(" + accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean isAdmin(UserGroupInformation ugi) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.isAdmin(" + ugi + ")");
+		}
+
+		boolean ret = false;
+		
+		AccessControlList admins = this.admins;
+
+		if(admins != null) {
+			ret = admins.isUserAllowed(ugi);
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.isAdmin(" + ugi + "): " + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public void setAdmins(AccessControlList acl, UserGroupInformation ugi) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.setAdmins(" + acl + ", " + ugi + ")");
+		}
+
+		admins = acl;
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.setAdmins(" + acl + ", " + ugi + ")");
+		}
+	}
+
+	@Override
+	public void setPermission(PrivilegedEntity entity, Map<AccessType, AccessControlList> permission, UserGroupInformation ugi) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.setPermission(" + toString(entity) + ", " + permission + ", " + ugi + ")");
+		}
+
+		yarnAcl.put(entity, permission);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.setPermission(" + toString(entity) + ", " + permission + ", " + ugi + ")");
+		}
+	}
+
+	public boolean isAllowedByYarnAcl(AccessType accessType, PrivilegedEntity entity, UserGroupInformation ugi, RangerYarnAuditHandler auditHandler) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + toString(entity) + ", " + ugi + ")");
+		}
+
+		boolean ret = false;
+
+		for(Map.Entry<PrivilegedEntity, Map<AccessType, AccessControlList>> e : yarnAcl.entrySet()) {
+			PrivilegedEntity                   aclEntity         = e.getKey();
+			Map<AccessType, AccessControlList> entityPermissions = e.getValue();
+
+			AccessControlList acl = entityPermissions == null ? null : entityPermissions.get(accessType);
+
+			if(acl == null || !acl.isUserAllowed(ugi)) {
+				continue;
+			}
+
+			if(! isSelfOrChildOf(entity, aclEntity)) {
+				continue;
+			}
+
+			ret = true;
+
+			break;
+		}
+
+		if(auditHandler != null) {
+			auditHandler.logYarnAclEvent(ret);
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
+		}
+
+		return ret;
+	}
+
+	private static String getRangerAccessType(AccessType accessType) {
+		String ret = null;
+
+		switch(accessType) {
+			case ADMINISTER_QUEUE:
+				ret = RangerYarnAuthorizerImpl.ACCESS_TYPE_ADMIN_QUEUE;
+			break;
+
+			case SUBMIT_APP:
+				ret = RangerYarnAuthorizerImpl.ACCESS_TYPE_SUBMIT_APP;
+			break;
+		}
+
+		return ret;
+	}
+
+	private boolean isSelfOrChildOf(PrivilegedEntity queue, PrivilegedEntity parentQueue) {
+		boolean ret = queue.equals(parentQueue);
+
+		if(!ret && queue.getType() == EntityType.QUEUE) {
+			String queueName       = queue.getName();
+			String parentQueueName = parentQueue.getName();
+
+			if(queueName.contains(".") && !StringUtil.isEmpty(parentQueueName)) {
+				if(parentQueueName.charAt(parentQueueName.length() - 1) != '.') {
+					parentQueueName += ".";
+				}
+
+				ret = queueName.startsWith(parentQueueName);
+			}
+		}
+
+		return ret;
+	}
+
+	private String toString(PrivilegedEntity entity) {
+		if(entity != null) {
+			return "{name=" + entity.getName() + "; type=" + entity.getType() + "}";
+		}
+
+		return "null";
+	}
+}
+
+class RangerYarnPlugin extends RangerBasePlugin {
+	public RangerYarnPlugin() {
+		super("yarn", "yarn");
+	}
+
+	@Override
+	public void init() {
+		super.init();
+
+		RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+
+		super.setResultProcessor(auditHandler);
+	}
+}
+
+class RangerYarnResource extends RangerAccessResourceImpl {
+	private static final String KEY_QUEUE = "queue";
+
+	public RangerYarnResource(PrivilegedEntity entity) {
+		setValue(KEY_QUEUE, entity != null ? entity.getName() : null);
+	}
+}
+
+class RangerYarnAccessRequest extends RangerAccessRequestImpl {
+	public RangerYarnAccessRequest(PrivilegedEntity entity, String accessType, String action, UserGroupInformation ugi) {
+		super.setResource(new RangerYarnResource(entity));
+		super.setAccessType(accessType);
+		super.setUser(ugi.getShortUserName());
+		super.setUserGroups(Sets.newHashSet(ugi.getGroupNames()));
+		super.setAccessTime(StringUtil.getUTCDate());
+		super.setClientIPAddress(getRemoteIp());
+		super.setAction(accessType);
+	}
+	
+	private static String getRemoteIp() {
+		String ret = null ;
+		InetAddress ip = Server.getRemoteIp() ;
+		if (ip != null) {
+			ret = ip.getHostAddress();
+		}
+		return ret ;
+	}
+}
+
+class RangerYarnAuditHandler extends RangerDefaultAuditHandler {
+	private static final Log LOG = LogFactory.getLog(RangerYarnAuditHandler.class);
+
+	private static final String YarnModuleName = RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_YARN_MODULE_ACL_NAME_PROP , RangerHadoopConstants.DEFAULT_YARN_MODULE_ACL_NAME) ;
+
+	private boolean         isAuditEnabled = false;
+	private AuthzAuditEvent auditEvent     = null;
+
+	public RangerYarnAuditHandler() {
+	}
+
+	@Override
+	public void processResult(RangerAccessResult result) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuditHandler.logAudit(" + result + ")");
+		}
+
+		if(! isAuditEnabled && result.getIsAudited()) {
+			isAuditEnabled = true;
+		}
+
+		auditEvent = super.getAuthzEvents(result);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuditHandler.logAudit(" + result + "): " + auditEvent);
+		}
+	}
+
+	public void logYarnAclEvent(boolean accessGranted) {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuditHandler.logYarnAclEvent(" + accessGranted + ")");
+		}
+
+		if(auditEvent != null) {
+			auditEvent.setAccessResult((short) (accessGranted ? 1 : 0));
+			auditEvent.setAclEnforcer(YarnModuleName);
+			auditEvent.setPolicyId(-1);
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuditHandler.logYarnAclEvent(" + accessGranted + "): " + auditEvent);
+		}
+	}
+
+	public void flushAudit() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerYarnAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
+		}
+
+		if(isAuditEnabled) {
+			super.logAuthzAudit(auditEvent);
+		}
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerYarnAuditHandler.flushAudit(" + isAuditEnabled + ", " + auditEvent + ")");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0ccf12e..2452785 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,8 +14,7 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
         <groupId>org.apache</groupId>
@@ -97,6 +96,15 @@
   <module>unixauthservice</module>
   <module>ranger-util</module>
   <module>plugin-kms</module>
+  <module>plugin-kafka</module>
+  <module>ranger-hdfs-plugin-shim</module>
+  <module>ranger-plugin-classloader</module>
+  <module>ranger-hive-plugin-shim</module>
+  <module>ranger-hbase-plugin-shim</module>
+  <module>ranger-knox-plugin-shim</module>
+  <module>ranger-yarn-plugin-shim</module>
+  <module>ranger-storm-plugin-shim</module>
+  <module>ranger-kafka-plugin-shim</module>
   </modules>
   <properties>
         <javac.source.version>1.7</javac.source.version>
@@ -150,8 +158,9 @@
 		<jersey-bundle.version>1.17.1</jersey-bundle.version>
 		<jersey-client.version>2.6</jersey-client.version>
 		<junit.version>4.11</junit.version>
-		<kafka.version>0.8.2.0</kafka.version>
-		<!-- <kafka.version>0.8.2.2.3.0.0-2208</kafka.version> -->
+		<!--  <kafka.version>0.8.2.0</kafka.version> -->
+		<!--  <kafka.version>0.8.2.2.3.0.0-2320</kafka.version> -->
+		<kafka.version>0.8.2.2.3.2.0-2950</kafka.version>
 		<mockito.version>1.8.4</mockito.version>
 		<hamcrest-version>1.3</hamcrest-version>
 		<knox.gateway.version>0.6.0</knox.gateway.version>
@@ -505,7 +514,7 @@
              <phase>process-resources</phase>
              <configuration>
                <target>
-                  <echo message="${project.version}" file="${project.build.directory}/version" />
+                  <echo message="${project.version}" file="${project.build.directory}/version"/>
                </target>
              </configuration>
              <goals>
@@ -524,4 +533,4 @@
       </plugin>
     </plugins>
   </build>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/pom.xml
----------------------------------------------------------------------
diff --git a/ranger-hbase-plugin-shim/pom.xml b/ranger-hbase-plugin-shim/pom.xml
new file mode 100644
index 0000000..c95075b
--- /dev/null
+++ b/ranger-hbase-plugin-shim/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>security_plugins.ranger-hbase-plugin-shim</groupId>
+  <artifactId>ranger-hbase-plugin-shim</artifactId>
+  <name>HBase Security Plugin Shim</name>
+  <description>HBase Security Plugins Shim</description>
+  <packaging>jar</packaging>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  <parent>
+     <groupId>org.apache.ranger</groupId>
+     <artifactId>ranger</artifactId>
+     <version>0.5.0</version>
+     <relativePath>..</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+	<groupId>org.apache.hbase</groupId>
+	<artifactId>hbase-server</artifactId>
+	<version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+	<groupId>org.apache.hadoop</groupId>
+	<artifactId>hadoop-hdfs</artifactId>
+	<version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>security_plugins.ranger-plugins-common</groupId>
+      <artifactId>ranger-plugins-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>security_plugins.ranger-plugins-audit</groupId>
+      <artifactId>ranger-plugins-audit</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>security_plugins.ranger-plugin-classloader</groupId>
+        <artifactId>ranger-plugin-classloader</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+        <dependency>
+        <groupId>security_plugins.ranger-hbase-plugin</groupId>
+        <artifactId>ranger-hbase-plugin</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-integration</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
----------------------------------------------------------------------
diff --git a/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java b/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
new file mode 100644
index 0000000..bc01e51
--- /dev/null
+++ b/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.xasecure.authorization.hbase;
+
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor;
+/**
+ * This class exists only to provide for seamless upgrade/downgrade capabilities.  Coprocessor name is in hbase config files in /etc/.../conf which
+ * is not only out of bounds for any upgrade script but also must be of a form to allow for downgrad!  Thus when class names were changed XaSecure* -> Ranger* 
+ * this shell class serves to allow for seamles upgrade as well as downgrade.
+ * 
+ * This class is final because if one needs to customize coprocessor it is expected that RangerAuthorizationCoprocessor would be modified/extended as that is
+ * the "real" coprocessor!  This class, hence, should NEVER be more than an EMPTY shell!
+ */
+public final class XaSecureAuthorizationCoprocessor extends RangerAuthorizationCoprocessor implements AccessControlService.Interface, CoprocessorService {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
----------------------------------------------------------------------
diff --git a/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java b/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
new file mode 100644
index 0000000..7f33b15
--- /dev/null
+++ b/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.log4j.Logger;
+
+
+public class RangerAccessControlLists {
+	
+	private static final Logger LOG = Logger.getLogger(RangerAccessControlLists.class) ;
+	
+	public static void init(MasterServices master) throws IOException {
+
+		Class<AccessControlLists> accessControlListsClass = AccessControlLists.class ;
+		String cName = accessControlListsClass.getName() ;
+
+		Class<?>[] params = new Class[1] ;
+		params[0] = MasterServices.class ;
+		
+		for (String mname : new String[] { "init", "createACLTable" } ) {
+			try {
+				try {
+					Method m = accessControlListsClass.getDeclaredMethod(mname, params) ;
+					if (m != null) {
+						try {
+							
+							try {
+								m.invoke(null, master) ;
+								logInfo("Execute method name [" + mname + "] in Class [" +  cName + "] is successful.");
+							} catch (InvocationTargetException e) {
+								Throwable cause = e ;
+								boolean tableExistsExceptionFound = false ;
+								if  (e != null) { 	
+									Throwable ecause = e.getTargetException() ;
+									if (ecause != null) {
+										cause = ecause ;
+										if (ecause instanceof TableExistsException) {
+											tableExistsExceptionFound = true ;
+										}
+									}
+								}
+								if (! tableExistsExceptionFound) {
+									logError("Unable to execute the method [" + mname + "] on [" + cName + "] due to exception", cause) ;
+									throw new IOException(cause) ;
+								}
+							}
+							return ;
+						} catch (IllegalArgumentException e) {
+							logError("Unable to execute method name [" + mname + "] in Class [" +  cName + "].", e);
+							throw new IOException(e) ;
+						} catch (IllegalAccessException e) {
+							logError("Unable to execute method name [" + mname + "] in Class [" +  cName + "].", e);
+							throw new IOException(e) ;
+						}
+					}
+				}
+				catch(NoSuchMethodException nsme) {
+					logInfo("Unable to get method name [" + mname + "] in Class [" +  cName + "]. Ignoring the exception");
+				}
+			} catch (SecurityException e) {
+				logError("Unable to get method name [" + mname + "] in Class [" +  cName + "].", e);
+				throw new IOException(e) ;
+			}
+		}
+		throw new IOException("Unable to initialize() [" + cName + "]") ;
+	}
+	
+	
+	private static void logInfo(String msg) {
+		// System.out.println(msg) ;
+		LOG.info(msg) ;
+	}
+
+	private static void logError(String msg, Throwable t) {
+//		System.err.println(msg) ;
+//		if (t != null) {
+//			t.printStackTrace(System.err);
+//		}
+		LOG.error(msg, t);
+	}
+
+}



Mime
View raw message