hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1203909 [2/3] - in /hbase/trunk: ./ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbas...
Date Sat, 19 Nov 2011 00:15:28 GMT
Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Performs authorization checks for a given user's assigned permissions
+ */
+public class TableAuthManager {
+  /** Key for the user and group cache maps for globally assigned permissions */
+  private static final String GLOBAL_CACHE_KEY = ".access.";
+  private static Log LOG = LogFactory.getLog(TableAuthManager.class);
+
+  private static TableAuthManager instance;
+
+  /** Cache of global user permissions */
+  private ListMultimap<String,Permission> USER_CACHE = ArrayListMultimap.create();
+  /** Cache of global group permissions */
+  private ListMultimap<String,Permission> GROUP_CACHE = ArrayListMultimap.create();
+
+  private ConcurrentSkipListMap<byte[], ListMultimap<String,TablePermission>> TABLE_USER_CACHE =
+      new ConcurrentSkipListMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+  private ConcurrentSkipListMap<byte[], ListMultimap<String,TablePermission>> TABLE_GROUP_CACHE =
+      new ConcurrentSkipListMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+  private Configuration conf;
+  private ZKPermissionWatcher zkperms;
+
+  private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
+      throws IOException {
+    this.conf = conf;
+    this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
+    try {
+      this.zkperms.start();
+    } catch (KeeperException ke) {
+      LOG.error("ZooKeeper initialization failed", ke);
+    }
+
+    // initialize global permissions based on configuration
+    initGlobal(conf);
+  }
+
+  private void initGlobal(Configuration conf) throws IOException {
+    User user = User.getCurrent();
+    if (user == null) {
+      throw new IOException("Unable to obtain the current user, " +
+          "authorization checks for internal operations will not work correctly!");
+    }
+    String currentUser = user.getShortName();
+
+    // the system user is always included
+    List<String> superusers = Lists.asList(currentUser, conf.getStrings(
+        AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
+    if (superusers != null) {
+      for (String name : superusers) {
+        if (AccessControlLists.isGroupPrincipal(name)) {
+          GROUP_CACHE.put(AccessControlLists.getGroupName(name),
+              new Permission(Permission.Action.values()));
+        } else {
+          USER_CACHE.put(name, new Permission(Permission.Action.values()));
+        }
+      }
+    }
+  }
+
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return this.zkperms;
+  }
+
+  public void refreshCacheFromWritable(byte[] table, byte[] data) throws IOException {
+    if (data != null && data.length > 0) {
+      DataInput in = new DataInputStream( new ByteArrayInputStream(data) );
+      ListMultimap<String,TablePermission> perms = AccessControlLists.readPermissions(in, conf);
+      cache(table, perms);
+    } else {
+      LOG.debug("Skipping permission cache refresh because writable data is empty");
+    }
+  }
+
+  /**
+   * Updates the internal permissions cache for a single table, splitting
+   * the permissions listed into separate caches for users and groups to optimize
+   * group lookups.
+   * 
+   * @param table
+   * @param tablePerms
+   */
+  private void cache(byte[] table,
+      ListMultimap<String,TablePermission> tablePerms) {
+    // split user from group assignments so we don't have to prepend the group
+    // prefix every time we query for groups
+    ListMultimap<String,TablePermission> userPerms = ArrayListMultimap.create();
+    ListMultimap<String,TablePermission> groupPerms = ArrayListMultimap.create();
+
+    if (tablePerms != null) {
+      for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
+        if (AccessControlLists.isGroupPrincipal(entry.getKey())) {
+          groupPerms.put(
+              entry.getKey().substring(AccessControlLists.GROUP_PREFIX.length()),
+              entry.getValue());
+        } else {
+          userPerms.put(entry.getKey(), entry.getValue());
+        }
+      }
+      TABLE_GROUP_CACHE.put(table, groupPerms);
+      TABLE_USER_CACHE.put(table, userPerms);
+    }
+  }
+
+  private List<TablePermission> getUserPermissions(String username, byte[] table) {
+    ListMultimap<String, TablePermission> tablePerms = TABLE_USER_CACHE.get(table);
+    if (tablePerms != null) {
+      return tablePerms.get(username);
+    }
+
+    return null;
+  }
+
+  private List<TablePermission> getGroupPermissions(String groupName, byte[] table) {
+    ListMultimap<String, TablePermission> tablePerms = TABLE_GROUP_CACHE.get(table);
+    if (tablePerms != null) {
+      return tablePerms.get(groupName);
+    }
+
+    return null;
+  }
+
+  /**
+   * Authorizes a global permission
+   * @param perms
+   * @param action
+   * @return
+   */
+  private boolean authorize(List<Permission> perms, Permission.Action action) {
+    if (perms != null) {
+      for (Permission p : perms) {
+        if (p.implies(action)) {
+          return true;
+        }
+      }
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("No permissions found");
+    }
+
+    return false;
+  }
+
+  /**
+   * Authorize a global permission based on ACLs for the given user and the
+   * user's groups.
+   * @param user
+   * @param action
+   * @return
+   */
+  public boolean authorize(User user, Permission.Action action) {
+    if (user == null) {
+      return false;
+    }
+
+    if (authorize(USER_CACHE.get(user.getShortName()), action)) {
+      return true;
+    }
+
+    String[] groups = user.getGroupNames();
+    if (groups != null) {
+      for (String group : groups) {
+        if (authorize(GROUP_CACHE.get(group), action)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private boolean authorize(List<TablePermission> perms, byte[] table, byte[] family,
+      Permission.Action action) {
+    return authorize(perms, table, family, null, action);
+  }
+
+  private boolean authorize(List<TablePermission> perms, byte[] table, byte[] family,
+      byte[] qualifier, Permission.Action action) {
+    if (perms != null) {
+      for (TablePermission p : perms) {
+        if (p.implies(table, family, qualifier, action)) {
+          return true;
+        }
+      }
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("No permissions found for table="+Bytes.toStringBinary(table));
+    }
+    return false;
+  }
+
+  public boolean authorize(User user, byte[] table, KeyValue kv,
+      TablePermission.Action action) {
+    List<TablePermission> userPerms = getUserPermissions(
+        user.getShortName(), table);
+    if (authorize(userPerms, table, kv, action)) {
+      return true;
+    }
+
+    String[] groupNames = user.getGroupNames();
+    if (groupNames != null) {
+      for (String group : groupNames) {
+        List<TablePermission> groupPerms = getGroupPermissions(group, table);
+        if (authorize(groupPerms, table, kv, action)) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  private boolean authorize(List<TablePermission> perms, byte[] table, KeyValue kv,
+      TablePermission.Action action) {
+    if (perms != null) {
+      for (TablePermission p : perms) {
+        if (p.implies(table, kv, action)) {
+          return true;
+        }
+      }
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("No permissions for authorize() check, table=" +
+          Bytes.toStringBinary(table));
+    }
+
+    return false;
+  }
+
+  /**
+   * Checks global authorization for a specific action for a user, based on the
+   * stored user permissions.
+   */
+  public boolean authorizeUser(String username, Permission.Action action) {
+    return authorize(USER_CACHE.get(username), action);
+  }
+
+  /**
+   * Checks authorization to a given table and column family for a user, based on the
+   * stored user permissions.
+   *
+   * @param username
+   * @param table
+   * @param family
+   * @param action
+   * @return
+   */
+  public boolean authorizeUser(String username, byte[] table, byte[] family,
+      Permission.Action action) {
+    return authorizeUser(username, table, family, null, action);
+  }
+
+  public boolean authorizeUser(String username, byte[] table, byte[] family,
+      byte[] qualifier, Permission.Action action) {
+    // global authorization supercedes table level
+    if (authorizeUser(username, action)) {
+      return true;
+    }
+    return authorize(getUserPermissions(username, table), table, family,
+        qualifier, action);
+  }
+
+
+  /**
+   * Checks authorization for a given action for a group, based on the stored
+   * permissions.
+   */
+  public boolean authorizeGroup(String groupName, Permission.Action action) {
+    return authorize(GROUP_CACHE.get(groupName), action);
+  }
+
+  /**
+   * Checks authorization to a given table and column family for a group, based
+   * on the stored permissions. 
+   * @param groupName
+   * @param table
+   * @param family
+   * @param action
+   * @return
+   */
+  public boolean authorizeGroup(String groupName, byte[] table, byte[] family,
+      Permission.Action action) {
+    // global authorization supercedes table level
+    if (authorizeGroup(groupName, action)) {
+      return true;
+    }
+    return authorize(getGroupPermissions(groupName, table), table, family, action);
+  }
+
+  public boolean authorize(User user, byte[] table, byte[] family,
+      byte[] qualifier, Permission.Action action) {
+    if (authorizeUser(user.getShortName(), table, family, qualifier, action)) {
+      return true;
+    }
+
+    String[] groups = user.getGroupNames();
+    if (groups != null) {
+      for (String group : groups) {
+        if (authorizeGroup(group, table, family, action)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public boolean authorize(User user, byte[] table, byte[] family,
+      Permission.Action action) {
+    return authorize(user, table, family, null, action);
+  }
+
+  /**
+   * Returns true if the given user has a {@link TablePermission} matching up
+   * to the column family portion of a permission.  Note that this permission
+   * may be scoped to a given column qualifier and does not guarantee that
+   * authorize() on the same column family would return true.
+   */
+  public boolean matchPermission(User user,
+      byte[] table, byte[] family, TablePermission.Action action) {
+    List<TablePermission> userPerms = getUserPermissions(
+        user.getShortName(), table);
+    if (userPerms != null) {
+      for (TablePermission p : userPerms) {
+        if (p.matchesFamily(table, family, action)) {
+          return true;
+        }
+      }
+    }
+
+    String[] groups = user.getGroupNames();
+    if (groups != null) {
+      for (String group : groups) {
+        List<TablePermission> groupPerms = getGroupPermissions(group, table);
+        if (groupPerms != null) {
+          for (TablePermission p : groupPerms) {
+            if (p.matchesFamily(table, family, action)) {
+              return true;
+            }
+          }
+        }
+      }
+    }
+
+    return false;
+  }
+
+  public boolean matchPermission(User user,
+      byte[] table, byte[] family, byte[] qualifier,
+      TablePermission.Action action) {
+    List<TablePermission> userPerms = getUserPermissions(
+        user.getShortName(), table);
+    if (userPerms != null) {
+      for (TablePermission p : userPerms) {
+        if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
+          return true;
+        }
+      }
+    }
+
+    String[] groups = user.getGroupNames();
+    if (groups != null) {
+      for (String group : groups) {
+        List<TablePermission> groupPerms = getGroupPermissions(group, table);
+        if (groupPerms != null) {
+          for (TablePermission p : groupPerms) {
+            if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
+              return true;
+            }
+          }
+        }
+      }
+    }
+
+    return false;
+  }
+
+  public void remove(byte[] table) {
+    TABLE_USER_CACHE.remove(table);
+    TABLE_GROUP_CACHE.remove(table);
+  }
+
+  /**
+   * Overwrites the existing permission set for a given user for a table, and
+   * triggers an update for zookeeper synchronization.
+   * @param username
+   * @param table
+   * @param perms
+   */
+  public void setUserPermissions(String username, byte[] table,
+      List<TablePermission> perms) {
+    ListMultimap<String,TablePermission> tablePerms = TABLE_USER_CACHE.get(table);
+    if (tablePerms == null) {
+      tablePerms = ArrayListMultimap.create();
+      TABLE_USER_CACHE.put(table, tablePerms);
+    }
+    tablePerms.replaceValues(username, perms);
+    writeToZooKeeper(table, tablePerms, TABLE_GROUP_CACHE.get(table));
+  }
+
+  /**
+   * Overwrites the existing permission set for a group and triggers an update
+   * for zookeeper synchronization.
+   * @param group
+   * @param table
+   * @param perms
+   */
+  public void setGroupPermissions(String group, byte[] table,
+      List<TablePermission> perms) {
+    ListMultimap<String,TablePermission> tablePerms = TABLE_GROUP_CACHE.get(table);
+    if (tablePerms == null) {
+      tablePerms = ArrayListMultimap.create();
+      TABLE_GROUP_CACHE.put(table, tablePerms);
+    }
+    tablePerms.replaceValues(group, perms);
+    writeToZooKeeper(table, TABLE_USER_CACHE.get(table), tablePerms);
+  }
+
+  public void writeToZooKeeper(byte[] table,
+      ListMultimap<String,TablePermission> userPerms,
+      ListMultimap<String,TablePermission> groupPerms) {
+    ListMultimap<String,TablePermission> tmp = ArrayListMultimap.create();
+    if (userPerms != null) {
+      tmp.putAll(userPerms);
+    }
+    if (groupPerms != null) {
+      for (String group : groupPerms.keySet()) {
+        tmp.putAll(AccessControlLists.GROUP_PREFIX + group,
+            groupPerms.get(group));
+      }
+    }
+    byte[] serialized = AccessControlLists.writePermissionsAsBytes(tmp, conf);
+    zkperms.writeToZookeeper(Bytes.toString(table), serialized);
+  }
+
+  static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
+    new HashMap<ZooKeeperWatcher,TableAuthManager>();
+
+  public synchronized static TableAuthManager get(
+      ZooKeeperWatcher watcher, Configuration conf) throws IOException {
+    instance = managerMap.get(watcher);
+    if (instance == null) {
+      instance = new TableAuthManager(watcher, conf);
+      managerMap.put(watcher, instance);
+    }
+    return instance;
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents an authorization for access for the given actions, optionally
+ * restricted to the given column family or column qualifier, over the
+ * given table.  If the family property is <code>null</code>, it implies
+ * full table access.
+ */
+public class TablePermission extends Permission {
+  private static Log LOG = LogFactory.getLog(TablePermission.class);
+
+  private byte[] table;
+  private byte[] family;
+  private byte[] qualifier;
+
+  /** Nullary constructor for Writable, do not use */
+  public TablePermission() {
+    super();
+  }
+
+  /**
+   * Create a new permission for the given table and (optionally) column family,
+   * allowing the given actions.
+   * @param table the table
+   * @param family the family, can be null if a global permission on the table
+   * @param assigned the list of allowed actions
+   */
+  public TablePermission(byte[] table, byte[] family, Action... assigned) {
+    this(table, family, null, assigned);
+  }
+
+  /**
+   * Creates a new permission for the given table, restricted to the given
+   * column family and qualifer, allowing the assigned actions to be performed.
+   * @param table the table
+   * @param family the family, can be null if a global permission on the table
+   * @param assigned the list of allowed actions
+   */
+  public TablePermission(byte[] table, byte[] family, byte[] qualifier,
+      Action... assigned) {
+    super(assigned);
+    this.table = table;
+    this.family = family;
+    this.qualifier = qualifier;
+  }
+
+  /**
+   * Creates a new permission for the given table, family and column qualifier,
+   * allowing the actions matching the provided byte codes to be performed.
+   * @param table the table
+   * @param family the family, can be null if a global permission on the table
+   * @param actionCodes the list of allowed action codes
+   */
+  public TablePermission(byte[] table, byte[] family, byte[] qualifier,
+      byte[] actionCodes) {
+    super(actionCodes);
+    this.table = table;
+    this.family = family;
+    this.qualifier = qualifier;
+  }
+
+  public byte[] getTable() {
+    return table;
+  }
+
+  public byte[] getFamily() {
+    return family;
+  }
+
+  public byte[] getQualifier() {
+    return qualifier;
+  }
+
+  /**
+   * Checks that a given table operation is authorized by this permission
+   * instance.
+   *
+   * @param table the table where the operation is being performed
+   * @param family the column family to which the operation is restricted,
+   *   if <code>null</code> implies "all"
+   * @param qualifier the column qualifier to which the action is restricted,
+   *   if <code>null</code> implies "all"
+   * @param action the action being requested
+   * @return <code>true</code> if the action within the given scope is allowed
+   *   by this permission, <code>false</code>
+   */
+  public boolean implies(byte[] table, byte[] family, byte[] qualifier,
+      Action action) {
+    if (!Bytes.equals(this.table, table)) {
+      return false;
+    }
+
+    if (this.family != null &&
+        (family == null ||
+         !Bytes.equals(this.family, family))) {
+      return false;
+    }
+
+    if (this.qualifier != null &&
+        (qualifier == null ||
+         !Bytes.equals(this.qualifier, qualifier))) {
+      return false;
+    }
+
+    // check actions
+    return super.implies(action);
+  }
+
+  /**
+   * Checks if this permission grants access to perform the given action on
+   * the given table and key value.
+   * @param table the table on which the operation is being performed
+   * @param kv the KeyValue on which the operation is being requested
+   * @param action the action requested
+   * @return <code>true</code> if the action is allowed over the given scope
+   *   by this permission, otherwise <code>false</code>
+   */
+  public boolean implies(byte[] table, KeyValue kv, Action action) {
+    if (!Bytes.equals(this.table, table)) {
+      return false;
+    }
+
+    if (family != null &&
+        (Bytes.compareTo(family, 0, family.length,
+            kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength()) != 0)) {
+      return false;
+    }
+
+    if (qualifier != null &&
+        (Bytes.compareTo(qualifier, 0, qualifier.length,
+            kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()) != 0)) {
+      return false;
+    }
+
+    // check actions
+    return super.implies(action);
+  }
+
+  /**
+   * Returns <code>true</code> if this permission matches the given column
+   * family at least.  This only indicates a partial match against the table
+   * and column family, however, and does not guarantee that implies() for the
+   * column same family would return <code>true</code>.  In the case of a
+   * column-qualifier specific permission, for example, implies() would still
+   * return false.
+   */
+  public boolean matchesFamily(byte[] table, byte[] family, Action action) {
+    if (!Bytes.equals(this.table, table)) {
+      return false;
+    }
+
+    if (this.family != null &&
+        (family == null ||
+         !Bytes.equals(this.family, family))) {
+      return false;
+    }
+
+    // ignore qualifier
+    // check actions
+    return super.implies(action);
+  }
+
+  /**
+   * Returns if the given permission matches the given qualifier.
+   * @param table the table name to match
+   * @param family the column family to match
+   * @param qualifier the qualifier name to match
+   * @param action the action requested
+   * @return <code>true</code> if the table, family and qualifier match,
+   *   otherwise <code>false</code>
+   */
+  public boolean matchesFamilyQualifier(byte[] table, byte[] family, byte[] qualifier,
+                                Action action) {
+    if (!matchesFamily(table, family, action)) {
+      return false;
+    } else {
+      if (this.qualifier != null &&
+          (qualifier == null ||
+           !Bytes.equals(this.qualifier, qualifier))) {
+        return false;
+      }
+    }
+    return super.implies(action);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof TablePermission)) {
+      return false;
+    }
+    TablePermission other = (TablePermission)obj;
+
+    if (!(Bytes.equals(table, other.getTable()) &&
+        ((family == null && other.getFamily() == null) ||
+         Bytes.equals(family, other.getFamily())) &&
+        ((qualifier == null && other.getQualifier() == null) ||
+         Bytes.equals(qualifier, other.getQualifier()))
+       )) {
+      return false;
+    }
+
+    // check actions
+    return super.equals(other);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 37;
+    int result = super.hashCode();
+    if (table != null) {
+      result = prime * result + Bytes.hashCode(table);
+    }
+    if (family != null) {
+      result = prime * result + Bytes.hashCode(family);
+    }
+    if (qualifier != null) {
+      result = prime * result + Bytes.hashCode(qualifier);
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder str = new StringBuilder("[TablePermission: ")
+        .append("table=").append(Bytes.toString(table))
+        .append(", family=").append(Bytes.toString(family))
+        .append(", qualifier=").append(Bytes.toString(qualifier))
+        .append(", actions=");
+    if (actions != null) {
+      for (int i=0; i<actions.length; i++) {
+        if (i > 0)
+          str.append(",");
+        if (actions[i] != null)
+          str.append(actions[i].toString());
+        else
+          str.append("NULL");
+      }
+    }
+    str.append("]");
+
+    return str.toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    table = Bytes.readByteArray(in);
+    if (in.readBoolean()) {
+      family = Bytes.readByteArray(in);
+    }
+    if (in.readBoolean()) {
+      qualifier = Bytes.readByteArray(in);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Bytes.writeByteArray(out, table);
+    out.writeBoolean(family != null);
+    if (family != null) {
+      Bytes.writeByteArray(out, family);
+    }
+    out.writeBoolean(qualifier != null);
+    if (qualifier != null) {
+      Bytes.writeByteArray(out, qualifier);
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents an authorization for access over the given table, column family
+ * plus qualifier, for the given user.
+ */
+public class UserPermission extends TablePermission {
+  private static Log LOG = LogFactory.getLog(UserPermission.class);
+
+  private byte[] user;
+
+  /** Nullary constructor for Writable, do not use */
+  public UserPermission() {
+    super();
+  }
+
+  /**
+   * Creates a new instance for the given user, table and column family.
+   * @param user the user
+   * @param table the table
+   * @param family the family, can be null if action is allowed over the entire
+   *   table
+   * @param assigned the list of allowed actions
+   */
+  public UserPermission(byte[] user, byte[] table, byte[] family,
+                        Action... assigned) {
+    super(table, family, assigned);
+    this.user = user;
+  }
+
+  /**
+   * Creates a new permission for the given user, table, column family and
+   * column qualifier.
+   * @param user the user
+   * @param table the table
+   * @param family the family, can be null if action is allowed over the entire
+   *   table
+   * @param qualifier the column qualifier, can be null if action is allowed
+   *   over the entire column family
+   * @param assigned the list of allowed actions
+   */
+  public UserPermission(byte[] user, byte[] table, byte[] family,
+                        byte[] qualifier, Action... assigned) {
+    super(table, family, qualifier, assigned);
+    this.user = user;
+  }
+
+  /**
+   * Creates a new instance for the given user, table, column family and
+   * qualifier, matching the actions with the given codes.
+   * @param user the user
+   * @param table the table
+   * @param family the family, can be null if action is allowed over the entire
+   *   table
+   * @param qualifier the column qualifier, can be null if action is allowed
+   *   over the entire column family
+   * @param actionCodes the list of allowed action codes
+   */
+  public UserPermission(byte[] user, byte[] table, byte[] family,
+                        byte[] qualifier, byte[] actionCodes) {
+    super(table, family, qualifier, actionCodes);
+    this.user = user;
+  }
+
+  public byte[] getUser() {
+    return user;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof UserPermission)) {
+      return false;
+    }
+    UserPermission other = (UserPermission)obj;
+
+    if ((Bytes.equals(user, other.getUser()) &&
+        super.equals(obj))) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 37;
+    int result = super.hashCode();
+    if (user != null) {
+      result = prime * result + Bytes.hashCode(user);
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder str = new StringBuilder("UserPermission: ")
+        .append("user=").append(Bytes.toString(user))
+        .append(", ").append(super.toString());
+    return str.toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    user = Bytes.readByteArray(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Bytes.writeByteArray(out, user);
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handles synchronization of access control list entries and updates
+ * throughout all nodes in the cluster.  The {@link AccessController} instance
+ * on the {@code _acl_} table regions, creates a znode for each table as
+ * {@code /hbase/acl/tablename}, with the znode data containing a serialized
+ * list of the permissions granted for the table.  The {@code AccessController}
+ * instances on all other cluster hosts watch the znodes for updates, which
+ * trigger updates in the {@link TableAuthManager} permission cache.
+ */
+public class ZKPermissionWatcher extends ZooKeeperListener {
+  private static Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
+  // parent node for permissions lists
+  static final String ACL_NODE = "acl";
+  TableAuthManager authManager;
+  String aclZNode;
+
+  public ZKPermissionWatcher(ZooKeeperWatcher watcher,
+      TableAuthManager authManager, Configuration conf) {
+    super(watcher);
+    this.authManager = authManager;
+    String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
+    this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
+  }
+
+  public void start() throws KeeperException {
+    watcher.registerListener(this);
+    if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
+      List<ZKUtil.NodeAndData> existing =
+          ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
+      if (existing != null) {
+        refreshNodes(existing);
+      }
+    }
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (path.equals(aclZNode)) {
+      try {
+        List<ZKUtil.NodeAndData> nodes =
+            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
+        refreshNodes(nodes);
+      } catch (KeeperException ke) {
+        LOG.error("Error reading data from zookeeper", ke);
+        // only option is to abort
+        watcher.abort("Zookeeper error obtaining acl node children", ke);
+      }
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    if (aclZNode.equals(ZKUtil.getParent(path))) {
+      String table = ZKUtil.getNodeName(path);
+      authManager.remove(Bytes.toBytes(table));
+    }
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    if (aclZNode.equals(ZKUtil.getParent(path))) {
+      // update cache on an existing table node
+      String table = ZKUtil.getNodeName(path);
+      try {
+        byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+        authManager.refreshCacheFromWritable(Bytes.toBytes(table), data);
+      } catch (KeeperException ke) {
+        LOG.error("Error reading data from zookeeper for node "+table, ke);
+        // only option is to abort
+        watcher.abort("Zookeeper error getting data for node " + table, ke);
+      } catch (IOException ioe) {
+        LOG.error("Error reading permissions writables", ioe);
+      }
+    }
+  }
+
+  @Override
+  public void nodeChildrenChanged(String path) {
+    if (path.equals(aclZNode)) {
+      // table permissions changed
+      try {
+        List<ZKUtil.NodeAndData> nodes =
+            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
+        refreshNodes(nodes);
+      } catch (KeeperException ke) {
+        LOG.error("Error reading data from zookeeper for path "+path, ke);
+        watcher.abort("Zookeeper error get node children for path "+path, ke);
+      }
+    }
+  }
+
+  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
+    for (ZKUtil.NodeAndData n : nodes) {
+      if (n.isEmpty()) continue;
+      String path = n.getNode();
+      String table = ZKUtil.getNodeName(path);
+      try {
+        byte[] nodeData = n.getData();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updating permissions cache from node "+table+" with data: "+
+              Bytes.toStringBinary(nodeData));
+        }
+        authManager.refreshCacheFromWritable(Bytes.toBytes(table),
+          nodeData);
+      } catch (IOException ioe) {
+        LOG.error("Failed parsing permissions for table '" + table +
+            "' from zk", ioe);
+      }
+    }
+  }
+
+  /***
+   * Write a table's access controls to the permissions mirror in zookeeper
+   * @param tableName
+   * @param permsData
+   */
+  public void writeToZookeeper(String tableName, 
+      byte[] permsData) {
+    String zkNode =
+        ZKUtil.joinZNode(ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE),
+          tableName);
+    try {
+      ZKUtil.createWithParents(watcher, zkNode);
+      ZKUtil.updateExistingNodeData(watcher, zkNode,
+        permsData, -1);
+    } catch (KeeperException e) {
+      LOG.error("Failed updating permissions for table '" + tableName +
+          "'", e);
+      watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
+    }
+  }
+}

Added: hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java (added)
+++ hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.SecureRpcEngine;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * Utility methods for testing security
+ */
+public class SecureTestUtil {
+  public static void enableSecurity(Configuration conf) throws IOException {
+    conf.set("hadoop.security.authorization", "false");
+    conf.set("hadoop.security.authentication", "simple");
+    conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName());
+    conf.set("hbase.coprocessor.master.classes", AccessController.class.getName());
+    conf.set("hbase.coprocessor.region.classes", AccessController.class.getName());
+    // add the process running user to superusers
+    String currentUser = User.getCurrent().getName();
+    conf.set("hbase.superuser", "admin,"+currentUser);
+  }
+}

Added: hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java (added)
+++ hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import static org.junit.Assert.*;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAccessControlFilter {
+  private static Log LOG = LogFactory.getLog(TestAccessControlFilter.class);
+  private static HBaseTestingUtility TEST_UTIL;
+
+  private static User ADMIN;
+  private static User READER;
+  private static User LIMITED;
+  private static User DENIED;
+
+  private static byte[] TABLE = Bytes.toBytes("testtable");
+  private static byte[] FAMILY = Bytes.toBytes("f1");
+  private static byte[] PRIVATE_COL = Bytes.toBytes("private");
+  private static byte[] PUBLIC_COL = Bytes.toBytes("public");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SecureTestUtil.enableSecurity(conf);
+    String baseuser = User.getCurrent().getShortName();
+    conf.set("hbase.superuser", conf.get("hbase.superuser", "") +
+        String.format(",%s.hfs.0,%s.hfs.1,%s.hfs.2", baseuser, baseuser, baseuser));
+    TEST_UTIL.startMiniCluster();
+    TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
+
+    ADMIN = User.createUserForTesting(conf, "admin", new String[]{"supergroup"});
+    READER = User.createUserForTesting(conf, "reader", new String[0]);
+    LIMITED = User.createUserForTesting(conf, "limited", new String[0]);
+    DENIED = User.createUserForTesting(conf, "denied", new String[0]);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testQualifierAccess() throws Exception {
+    final HTable table = TEST_UTIL.createTable(TABLE, FAMILY);
+
+    // set permissions
+    ADMIN.runAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        HTable aclmeta = new HTable(TEST_UTIL.getConfiguration(),
+            AccessControlLists.ACL_TABLE_NAME);
+        AccessControllerProtocol acls = aclmeta.coprocessorProxy(
+            AccessControllerProtocol.class, Bytes.toBytes("testtable"));
+        TablePermission perm = new TablePermission(TABLE, null, Permission.Action.READ);
+        acls.grant(Bytes.toBytes(READER.getShortName()), perm);
+        perm = new TablePermission(TABLE, FAMILY, PUBLIC_COL, Permission.Action.READ);
+        acls.grant(Bytes.toBytes(LIMITED.getShortName()), perm);
+        return null;
+      }
+    });
+
+    // put some test data
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int i=0; i<100; i++) {
+      Put p = new Put(Bytes.toBytes(i));
+      p.add(FAMILY, PRIVATE_COL, Bytes.toBytes("secret "+i));
+      p.add(FAMILY, PUBLIC_COL, Bytes.toBytes("info "+i));
+      puts.add(p);
+    }
+    table.put(puts);
+
+    // test read
+    READER.runAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+        // force a new RS connection
+        conf.set("testkey", UUID.randomUUID().toString());
+        HTable t = new HTable(conf, TABLE);
+        ResultScanner rs = t.getScanner(new Scan());
+        int rowcnt = 0;
+        for (Result r : rs) {
+          rowcnt++;
+          int rownum = Bytes.toInt(r.getRow());
+          assertTrue(r.containsColumn(FAMILY, PRIVATE_COL));
+          assertEquals("secret "+rownum, Bytes.toString(r.getValue(FAMILY, PRIVATE_COL)));
+          assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+          assertEquals("info "+rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+        }
+        assertEquals("Expected 100 rows returned", 100, rowcnt);
+        return null;
+      }
+    });
+
+    // test read with qualifier filter
+    LIMITED.runAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+        // force a new RS connection
+        conf.set("testkey", UUID.randomUUID().toString());
+        HTable t = new HTable(conf, TABLE);
+        ResultScanner rs = t.getScanner(new Scan());
+        int rowcnt = 0;
+        for (Result r : rs) {
+          rowcnt++;
+          int rownum = Bytes.toInt(r.getRow());
+          assertFalse(r.containsColumn(FAMILY, PRIVATE_COL));
+          assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+          assertEquals("info " + rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+        }
+        assertEquals("Expected 100 rows returned", 100, rowcnt);
+        return null;
+      }
+    });
+
+    // test as user with no permission
+    DENIED.runAs(new PrivilegedExceptionAction(){
+      public Object run() throws Exception {
+        try {
+          Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+          // force a new RS connection
+          conf.set("testkey", UUID.randomUUID().toString());
+          HTable t = new HTable(conf, TABLE);
+          ResultScanner rs = t.getScanner(new Scan());
+          fail("Attempt to open scanner should have been denied");
+        } catch (AccessDeniedException ade) {
+          // expected
+        }
+        return null;
+      }
+    });
+  }
+}

Added: hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java?rev=1203909&view=auto
==============================================================================
--- hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (added)
+++ hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java Sat Nov 19 00:15:27 2011
@@ -0,0 +1,980 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Performs authorization checks for common operations, according to different
+ * levels of authorized users.
+ */
+public class TestAccessController {
+  private static Log LOG = LogFactory.getLog(TestAccessController.class);
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+
+  // user with all permissions
+  private static User SUPERUSER;
+  // table owner user
+  private static User USER_OWNER;
+  // user with rw permissions
+  private static User USER_RW;
+  // user with read-only permissions
+  private static User USER_RO;
+  // user with no permissions
+  private static User USER_NONE;
+
+  private static byte[] TEST_TABLE = Bytes.toBytes("testtable");
+  private static byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static MasterCoprocessorEnvironment CP_ENV;
+  private static AccessController ACCESS_CONTROLLER;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    SecureTestUtil.enableSecurity(conf);
+
+    TEST_UTIL.startMiniCluster();
+    MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster()
+        .getMaster().getCoprocessorHost();
+    cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
+    ACCESS_CONTROLLER = (AccessController)cpHost.findCoprocessor(
+        AccessController.class.getName());
+    CP_ENV = cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+        Coprocessor.PRIORITY_HIGHEST, 1, conf);
+
+    // create a set of test users
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[]{"supergroup"});
+    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
+    USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
+    USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
+    USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]);
+
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    htd.setOwnerString(USER_OWNER.getShortName());
+    admin.createTable(htd);
+
+    // initilize access control
+    HTable meta = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
+    AccessControllerProtocol protocol =
+        meta.coprocessorProxy(AccessControllerProtocol.class, TEST_TABLE);
+    protocol.grant(Bytes.toBytes(USER_RW.getShortName()),
+        new TablePermission(TEST_TABLE, TEST_FAMILY, Permission.Action.READ,
+            Permission.Action.WRITE));
+
+    protocol.grant(Bytes.toBytes(USER_RO.getShortName()),
+        new TablePermission(TEST_TABLE, TEST_FAMILY, Permission.Action.READ));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public void verifyAllowed(User user, PrivilegedExceptionAction action)
+    throws Exception {
+    try {
+      user.runAs(action);
+    } catch (AccessDeniedException ade) {
+      fail("Expected action to pass for user '" + user.getShortName() +
+          "' but was denied");
+    }
+  }
+
+  public void verifyDenied(User user, PrivilegedExceptionAction action)
+    throws Exception {
+    try {
+      user.runAs(action);
+      fail("Expected AccessDeniedException for user '" + user.getShortName() + "'");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      // in case of batch operations, and put, the client assembles a
+      // RetriesExhaustedWithDetailsException instead of throwing an
+      // AccessDeniedException
+      boolean isAccessDeniedException = false;
+      for ( Throwable ex : e.getCauses()) {
+        if (ex instanceof AccessDeniedException) {
+          isAccessDeniedException = true;
+          break;
+        }
+      }
+      if (!isAccessDeniedException ) {
+        fail("Not receiving AccessDeniedException for user '" +
+            user.getShortName() + "'");
+      }
+    } catch (AccessDeniedException ade) {
+      // expected result
+    }
+  }
+
+  @Test
+  public void testTableCreate() throws Exception {
+    PrivilegedExceptionAction createTable = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        HTableDescriptor htd = new HTableDescriptor("testnewtable");
+        htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+        ACCESS_CONTROLLER.preCreateTable(
+            ObserverContext.createAndPrepare(CP_ENV, null), htd, null);
+        return null;
+      }
+    };
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, createTable);
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, createTable);
+    verifyDenied(USER_RW, createTable);
+    verifyDenied(USER_RO, createTable);
+    verifyDenied(USER_NONE, createTable);
+  }
+
+  @Test
+  public void testTableModify() throws Exception {
+    PrivilegedExceptionAction disableTable = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+        htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+        htd.addFamily(new HColumnDescriptor("fam_"+User.getCurrent().getShortName()));
+        ACCESS_CONTROLLER.preModifyTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, htd);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, disableTable);
+    verifyDenied(USER_RW, disableTable);
+    verifyDenied(USER_RO, disableTable);
+    verifyDenied(USER_NONE, disableTable);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, disableTable);
+  }
+
+  @Test
+  public void testTableDelete() throws Exception {
+    PrivilegedExceptionAction disableTable = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preDeleteTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, disableTable);
+    verifyDenied(USER_RW, disableTable);
+    verifyDenied(USER_RO, disableTable);
+    verifyDenied(USER_NONE, disableTable);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, disableTable);
+  }
+
+  @Test
+  public void testAddColumn() throws Exception {
+    final HColumnDescriptor hcd = new HColumnDescriptor("fam_new");
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preAddColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, hcd);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testModifyColumn() throws Exception {
+    final HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    hcd.setMaxVersions(10);
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preModifyColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, hcd);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testDeleteColumn() throws Exception {
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preDeleteColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, TEST_FAMILY);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testTableDisable() throws Exception {
+    PrivilegedExceptionAction disableTable = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preDisableTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, disableTable);
+    verifyDenied(USER_RW, disableTable);
+    verifyDenied(USER_RO, disableTable);
+    verifyDenied(USER_NONE, disableTable);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, disableTable);
+  }
+
+  @Test
+  public void testTableEnable() throws Exception {
+    PrivilegedExceptionAction enableTable = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preEnableTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, enableTable);
+    verifyDenied(USER_RW, enableTable);
+    verifyDenied(USER_RO, enableTable);
+    verifyDenied(USER_NONE, enableTable);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, enableTable);
+  }
+
+  @Test
+  public void testMove() throws Exception {
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
+    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
+        regions.entrySet().iterator().next();
+    final ServerName server = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preMove(ObserverContext.createAndPrepare(CP_ENV, null),
+            firstRegion.getKey(), server, server);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testAssign() throws Exception {
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
+    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
+        regions.entrySet().iterator().next();
+
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preAssign(ObserverContext.createAndPrepare(CP_ENV, null),
+            firstRegion.getKey());
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testUnassign() throws Exception {
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
+    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
+        regions.entrySet().iterator().next();
+
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preUnassign(ObserverContext.createAndPrepare(CP_ENV, null),
+            firstRegion.getKey(), false);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testBalance() throws Exception {
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preBalance(ObserverContext.createAndPrepare(CP_ENV, null));
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testBalanceSwitch() throws Exception {
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preBalanceSwitch(ObserverContext.createAndPrepare(CP_ENV, null), true);
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preShutdown(ObserverContext.createAndPrepare(CP_ENV, null));
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  @Test
+  public void testStopMaster() throws Exception {
+    PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        ACCESS_CONTROLLER.preStopMaster(ObserverContext.createAndPrepare(CP_ENV, null));
+        return null;
+      }
+    };
+
+    // all others should be denied
+    verifyDenied(USER_OWNER, action);
+    verifyDenied(USER_RW, action);
+    verifyDenied(USER_RO, action);
+    verifyDenied(USER_NONE, action);
+
+    // verify that superuser can create tables
+    verifyAllowed(SUPERUSER, action);
+  }
+
+  private void verifyWrite(PrivilegedExceptionAction action) throws Exception {
+    // should be denied
+    verifyDenied(USER_NONE, action);
+    verifyDenied(USER_RO, action);
+
+    // should be allowed
+    verifyAllowed(SUPERUSER, action);
+    verifyAllowed(USER_OWNER, action);
+    verifyAllowed(USER_RW, action);
+  }
+
+  private void verifyRead(PrivilegedExceptionAction action) throws Exception {
+    // should be denied
+    verifyDenied(USER_NONE, action);
+
+    // should be allowed
+    verifyAllowed(SUPERUSER, action);
+    verifyAllowed(USER_OWNER, action);
+    verifyAllowed(USER_RW, action);
+    verifyAllowed(USER_RO, action);
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    // get action
+    PrivilegedExceptionAction getAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Get g = new Get(Bytes.toBytes("random_row"));
+        g.addFamily(TEST_FAMILY);
+        HTable t = new HTable(conf, TEST_TABLE);
+        t.get(g);
+        return null;
+      }
+    };
+    verifyRead(getAction);
+
+    // action for scanning
+    PrivilegedExceptionAction scanAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Scan s = new Scan();
+        s.addFamily(TEST_FAMILY);
+
+        HTable table = new HTable(conf, TEST_TABLE);
+        ResultScanner scanner = table.getScanner(s);
+        try {
+          for (Result r = scanner.next(); r != null; r = scanner.next()) {
+            // do nothing
+          }
+        } catch (IOException e) {
+        } finally {
+          scanner.close();
+        }
+        return null;
+      }
+    };
+    verifyRead(scanAction);
+  }
+
+  @Test
+  // test put, delete, increment
+  public void testWrite() throws Exception {
+    // put action
+    PrivilegedExceptionAction putAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Put p = new Put(Bytes.toBytes("random_row"));
+        p.add(TEST_FAMILY, Bytes.toBytes("Qualifier"), Bytes.toBytes(1));
+        HTable t = new HTable(conf, TEST_TABLE);
+        t.put(p);
+        return null;
+      }
+    };
+    verifyWrite(putAction);
+
+    // delete action
+    PrivilegedExceptionAction deleteAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Delete d = new Delete(Bytes.toBytes("random_row"));
+        d.deleteFamily(TEST_FAMILY);
+        HTable t = new HTable(conf, TEST_TABLE);
+        t.delete(d);
+        return null;
+      }
+    };
+    verifyWrite(deleteAction);
+
+    // increment action
+    PrivilegedExceptionAction incrementAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Increment inc = new Increment(Bytes.toBytes("random_row"));
+        inc.addColumn(TEST_FAMILY, Bytes.toBytes("Qualifier"), 1);
+        HTable t = new HTable(conf, TEST_TABLE);
+        t.increment(inc);
+        return null;
+      }
+    };
+    verifyWrite(incrementAction);
+  }
+
+  @Test
+  public void testGrantRevoke() throws Exception {
+    final byte[] tableName = Bytes.toBytes("TempTable");
+    final byte[] family1 = Bytes.toBytes("f1");
+    final byte[] family2 = Bytes.toBytes("f2");
+    final byte[] qualifier = Bytes.toBytes("q");
+
+    // create table
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family1));
+    htd.addFamily(new HColumnDescriptor(family2));
+    htd.setOwnerString(USER_OWNER.getShortName());
+    admin.createTable(htd);
+
+    // create temp users
+    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(),
+        "user", new String[0]);
+
+    // perms only stored against the first region
+    HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
+    AccessControllerProtocol protocol =
+        acl.coprocessorProxy(AccessControllerProtocol.class,
+            tableName);
+
+    // prepare actions:
+    PrivilegedExceptionAction putActionAll = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Put p = new Put(Bytes.toBytes("a"));
+        p.add(family1, qualifier, Bytes.toBytes("v1"));
+        p.add(family2, qualifier, Bytes.toBytes("v2"));
+        HTable t = new HTable(conf, tableName);
+        t.put(p);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction putAction1 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Put p = new Put(Bytes.toBytes("a"));
+        p.add(family1, qualifier, Bytes.toBytes("v1"));
+        HTable t = new HTable(conf, tableName);
+        t.put(p);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction putAction2 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Put p = new Put(Bytes.toBytes("a"));
+        p.add(family2, qualifier, Bytes.toBytes("v2"));
+        HTable t = new HTable(conf, tableName);
+        t.put(p);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction getActionAll = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Get g = new Get(Bytes.toBytes("random_row"));
+        g.addFamily(family1);
+        g.addFamily(family2);
+        HTable t = new HTable(conf, tableName);
+        t.get(g);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction getAction1 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Get g = new Get(Bytes.toBytes("random_row"));
+        g.addFamily(family1);
+        HTable t = new HTable(conf, tableName);
+        t.get(g);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction getAction2 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Get g = new Get(Bytes.toBytes("random_row"));
+        g.addFamily(family2);
+        HTable t = new HTable(conf, tableName);
+        t.get(g);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction deleteActionAll = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Delete d = new Delete(Bytes.toBytes("random_row"));
+        d.deleteFamily(family1);
+        d.deleteFamily(family2);
+        HTable t = new HTable(conf, tableName);
+        t.delete(d);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction deleteAction1 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Delete d = new Delete(Bytes.toBytes("random_row"));
+        d.deleteFamily(family1);
+        HTable t = new HTable(conf, tableName);
+        t.delete(d);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction deleteAction2 = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Delete d = new Delete(Bytes.toBytes("random_row"));
+        d.deleteFamily(family2);
+        HTable t = new HTable(conf, tableName);
+        t.delete(d);
+        return null;
+      }
+    };
+
+    // initial check:
+    verifyDenied(user, getActionAll);
+    verifyDenied(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyDenied(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyDenied(user, deleteAction2);
+
+    // grant table read permission
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, null, Permission.Action.READ));
+    Thread.sleep(100);
+    // check
+    verifyAllowed(user, getActionAll);
+    verifyAllowed(user, getAction1);
+    verifyAllowed(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyDenied(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyDenied(user, deleteAction2);
+
+    // grant table write permission
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, null, Permission.Action.WRITE));
+    Thread.sleep(100);
+    verifyDenied(user, getActionAll);
+    verifyDenied(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyAllowed(user, putActionAll);
+    verifyAllowed(user, putAction1);
+    verifyAllowed(user, putAction2);
+
+    verifyAllowed(user, deleteActionAll);
+    verifyAllowed(user, deleteAction1);
+    verifyAllowed(user, deleteAction2);
+
+    // revoke table permission
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, null, Permission.Action.READ,
+        Permission.Action.WRITE));
+
+    protocol.revoke(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, null));
+    Thread.sleep(100);
+    verifyDenied(user, getActionAll);
+    verifyDenied(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyDenied(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyDenied(user, deleteAction2);
+
+    // grant column family read permission
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, family1, Permission.Action.READ));
+    Thread.sleep(100);
+
+    verifyAllowed(user, getActionAll);
+    verifyAllowed(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyDenied(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyDenied(user, deleteAction2);
+
+    // grant column family write permission
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, family2, Permission.Action.WRITE));
+    Thread.sleep(100);
+
+    verifyAllowed(user, getActionAll);
+    verifyAllowed(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyAllowed(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyAllowed(user, deleteAction2);
+
+    // revoke column family permission
+    protocol.revoke(Bytes.toBytes(user.getShortName()),
+      new TablePermission(tableName, family2));
+    Thread.sleep(100);
+
+    verifyAllowed(user, getActionAll);
+    verifyAllowed(user, getAction1);
+    verifyDenied(user, getAction2);
+
+    verifyDenied(user, putActionAll);
+    verifyDenied(user, putAction1);
+    verifyDenied(user, putAction2);
+
+    verifyDenied(user, deleteActionAll);
+    verifyDenied(user, deleteAction1);
+    verifyDenied(user, deleteAction2);
+
+    // delete table
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  private boolean hasFoundUserPermission(UserPermission userPermission,
+                                         List<UserPermission> perms) {
+    return perms.contains(userPermission);
+  }
+
+  @Test
+  public void testGrantRevokeAtQualifierLevel() throws Exception {
+    final byte[] tableName = Bytes.toBytes("testGrantRevokeAtQualifierLevel");
+    final byte[] family1 = Bytes.toBytes("f1");
+    final byte[] family2 = Bytes.toBytes("f2");
+    final byte[] qualifier = Bytes.toBytes("q");
+
+    // create table
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family1));
+    htd.addFamily(new HColumnDescriptor(family2));
+    htd.setOwnerString(USER_OWNER.getShortName());
+    admin.createTable(htd);
+
+    // create temp users
+    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(),
+        "user", new String[0]);
+
+    HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
+    AccessControllerProtocol protocol =
+        acl.coprocessorProxy(AccessControllerProtocol.class, tableName);
+
+    PrivilegedExceptionAction getQualifierAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Get g = new Get(Bytes.toBytes("random_row"));
+        g.addColumn(family1, qualifier);
+        HTable t = new HTable(conf, tableName);
+        t.get(g);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction putQualifierAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Put p = new Put(Bytes.toBytes("random_row"));
+        p.add(family1, qualifier, Bytes.toBytes("v1"));
+        HTable t = new HTable(conf, tableName);
+        t.put(p);
+        return null;
+      }
+    };
+    PrivilegedExceptionAction deleteQualifierAction = new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        Delete d = new Delete(Bytes.toBytes("random_row"));
+        d.deleteColumn(family1, qualifier);
+        //d.deleteFamily(family1);
+        HTable t = new HTable(conf, tableName);
+        t.delete(d);
+        return null;
+      }
+    };
+
+    protocol.revoke(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, family1));
+    verifyDenied(user, getQualifierAction);
+    verifyDenied(user, putQualifierAction);
+    verifyDenied(user, deleteQualifierAction);
+
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, family1, qualifier,
+            Permission.Action.READ));
+    Thread.sleep(100);
+
+    verifyAllowed(user, getQualifierAction);
+    verifyDenied(user, putQualifierAction);
+    verifyDenied(user, deleteQualifierAction);
+
+    // only grant write permission
+    // TODO: comment this portion after HBASE-3583
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, family1, qualifier,
+            Permission.Action.WRITE));
+    Thread.sleep(100);
+
+    verifyDenied(user, getQualifierAction);
+    verifyAllowed(user, putQualifierAction);
+    verifyAllowed(user, deleteQualifierAction);
+
+    // grant both read and write permission.
+    protocol.grant(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, family1, qualifier,
+            Permission.Action.READ, Permission.Action.WRITE));
+    Thread.sleep(100);
+
+    verifyAllowed(user, getQualifierAction);
+    verifyAllowed(user, putQualifierAction);
+    verifyAllowed(user, deleteQualifierAction);
+
+    // revoke family level permission won't impact column level.
+    protocol.revoke(Bytes.toBytes(user.getShortName()),
+        new TablePermission(tableName, family1, qualifier));
+    Thread.sleep(100);
+
+    verifyDenied(user, getQualifierAction);
+    verifyDenied(user, putQualifierAction);
+    verifyDenied(user, deleteQualifierAction);
+
+    // delete table
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  @Test
+  public void testPermissionList() throws Exception {
+    final byte[] tableName = Bytes.toBytes("testPermissionList");
+    final byte[] family1 = Bytes.toBytes("f1");
+    final byte[] family2 = Bytes.toBytes("f2");
+    final byte[] qualifier = Bytes.toBytes("q");
+    final byte[] user = Bytes.toBytes("user");
+
+    // create table
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family1));
+    htd.addFamily(new HColumnDescriptor(family2));
+    htd.setOwnerString(USER_OWNER.getShortName());
+    admin.createTable(htd);
+
+    HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
+    AccessControllerProtocol protocol =
+        acl.coprocessorProxy(AccessControllerProtocol.class, tableName);
+
+    List<UserPermission> perms = protocol.getUserPermissions(tableName);
+
+    UserPermission up = new UserPermission(user,
+        tableName, family1, qualifier, Permission.Action.READ);
+    assertFalse("User should not be granted permission: " + up.toString(),
+        hasFoundUserPermission(up, perms));
+
+    // grant read permission
+    UserPermission upToSet = new UserPermission(user,
+        tableName, family1, qualifier, Permission.Action.READ);
+    protocol.grant(user, upToSet);
+    perms = protocol.getUserPermissions(tableName);
+
+    UserPermission upToVerify = new UserPermission(user,
+        tableName, family1, qualifier, Permission.Action.READ);
+    assertTrue("User should be granted permission: " + upToVerify.toString(),
+        hasFoundUserPermission(upToVerify, perms));
+
+    upToVerify = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.WRITE);
+    assertFalse("User should not be granted permission: " + upToVerify.toString(),
+        hasFoundUserPermission(upToVerify, perms));
+
+    // grant read+write
+    upToSet = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.WRITE, Permission.Action.READ);
+    protocol.grant(user, upToSet);
+    perms = protocol.getUserPermissions(tableName);
+
+    upToVerify = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.WRITE, Permission.Action.READ);
+    assertTrue("User should be granted permission: " + upToVerify.toString(),
+            hasFoundUserPermission(upToVerify, perms));
+
+    protocol.revoke(user, upToSet);
+    perms = protocol.getUserPermissions(tableName);
+    assertFalse("User should not be granted permission: " + upToVerify.toString(),
+      hasFoundUserPermission(upToVerify, perms));
+
+    // delete table
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+}



Mime
View raw message