hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1203915 [1/3] - in /hbase/branches/0.92: ./ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/had...
Date Sat, 19 Nov 2011 00:25:38 GMT
Author: garyh
Date: Sat Nov 19 00:25:37 2011
New Revision: 1203915

URL: http://svn.apache.org/viewvc?rev=1203915&view=rev
Log:
HBASE-3025  Security: coprocessor based access control

Added:
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
    hbase/branches/0.92/src/main/ruby/hbase/security.rb
    hbase/branches/0.92/src/main/ruby/shell/commands/grant.rb
    hbase/branches/0.92/src/main/ruby/shell/commands/revoke.rb
    hbase/branches/0.92/src/main/ruby/shell/commands/user_permission.rb
Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/branches/0.92/src/main/resources/hbase-default.xml
    hbase/branches/0.92/src/main/ruby/hbase.rb
    hbase/branches/0.92/src/main/ruby/hbase/admin.rb
    hbase/branches/0.92/src/main/ruby/hbase/hbase.rb
    hbase/branches/0.92/src/main/ruby/shell.rb
    hbase/branches/0.92/src/main/ruby/shell/commands.rb

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1203915&r1=1203914&r2=1203915&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Sat Nov 19 00:25:37 2011
@@ -801,6 +801,7 @@ Release 0.92.0 - Unreleased
    HBASE-4806  Fix logging message in HbaseObjectWritable
                (Jonathan Hsieh via todd)
    HBASE-2742  Provide strong authentication with a secure RPC engine         
+   HBASE-3025  Coprocessor based access control
 
 Release 0.90.5 - Unreleased
 

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java?rev=1203915&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java Sat Nov 19 00:25:37 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * <strong>NOTE: for internal use only by AccessController implementation</strong>
+ *
+ * <p>
+ * TODO: There is room for further performance optimization here.
+ * Calling TableAuthManager.authorize() per KeyValue imposes a fair amount of
+ * overhead.  A more optimized solution might look at the qualifiers where
+ * permissions are actually granted and explicitly limit the scan to those.
+ * </p>
+ * <p>
+ * We should aim to use this _only_ when access to the requested column families
+ * is not granted at the column family levels.  If table or column family
+ * access succeeds, then there is no need to impose the overhead of this filter.
+ * </p>
+ */
+class AccessControlFilter extends FilterBase {
+
+  private TableAuthManager authManager;
+  private byte[] table;
+  private User user;
+
+  /**
+   * For Writable
+   */
+  AccessControlFilter() {
+  }
+
+  AccessControlFilter(TableAuthManager mgr, User ugi,
+      byte[] tableName) {
+    authManager = mgr;
+    table = tableName;
+    user = ugi;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue kv) {
+    if (authManager.authorize(user, table, kv, TablePermission.Action.READ)) {
+      return ReturnCode.INCLUDE;
+    }
+    return ReturnCode.NEXT_COL;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    // no implementation, server-side use only
+    throw new UnsupportedOperationException(
+        "Serialization not supported.  Intended for server-side use only.");
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    // no implementation, server-side use only
+    throw new UnsupportedOperationException(
+        "Serialization not supported.  Intended for server-side use only.");
+  }
+}

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java?rev=1203915&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java Sat Nov 19 00:25:37 2011
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Maintains lists of permission grants to users and groups to allow for
+ * authorization checks by {@link AccessController}.
+ *
+ * <p>
+ * Access control lists are stored in an "internal" metadata table named
+ * {@code _acl_}. Each table's permission grants are stored as a separate row,
+ * keyed by the table name. KeyValues for permissions assignments are stored
+ * in one of the formats:
+ * <pre>
+ * Key                      Desc
+ * --------                 --------
+ * user                     table level permissions for a user [R=read, W=write]
+ * @group                   table level permissions for a group
+ * user,family              column family level permissions for a user
+ * @group,family            column family level permissions for a group
+ * user,family,qualifier    column qualifier level permissions for a user
+ * @group,family,qualifier  column qualifier level permissions for a group
+ * </pre>
+ * All values are encoded as byte arrays containing the codes from the
+ * {@link org.apache.hadoop.hbase.security.access.TablePermission.Action} enum.
+ * </p>
+ */
+public class AccessControlLists {
+  /** Internal storage table for access control lists */
+  public static final String ACL_TABLE_NAME_STR = "_acl_";
+  public static final byte[] ACL_TABLE_NAME = Bytes.toBytes(ACL_TABLE_NAME_STR);
+  /** Column family used to store ACL grants */
+  public static final String ACL_LIST_FAMILY_STR = "l";
+  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
+
+  /** Table descriptor for ACL internal table */
+  public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(
+      ACL_TABLE_NAME);
+  static {
+    ACL_TABLEDESC.addFamily(
+        new HColumnDescriptor(ACL_LIST_FAMILY,
+            10, // Ten is arbitrary number.  Keep versions to help debugging.
+            Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
+            HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
+            HConstants.REPLICATION_SCOPE_LOCAL));
+  }
+
+  /**
+   * Delimiter to separate user, column family, and qualifier in
+   * _acl_ table info: column keys */
+  public static final char ACL_KEY_DELIMITER = ',';
+  /** Prefix character to denote group names */
+  public static final String GROUP_PREFIX = "@";
+  /** Configuration key for superusers */
+  public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
+
+  private static Log LOG = LogFactory.getLog(AccessControlLists.class);
+
+  /**
+   * Check for existence of {@code _acl_} table and create it if it does not exist
+   * @param master reference to HMaster
+   */
+  static void init(MasterServices master) throws IOException {
+    if (!MetaReader.tableExists(master.getCatalogTracker(), ACL_TABLE_NAME_STR)) {
+      master.createTable(ACL_TABLEDESC, null);
+    }
+  }
+
+  /**
+   * Stores a new table permission grant in the access control lists table.
+   * @param conf the configuration
+   * @param tableName the table to which access is being granted
+   * @param username the user or group being granted the permission
+   * @param perm the details of the permission being granted
+   * @throws IOException in the case of an error accessing the metadata table
+   */
+  static void addTablePermission(Configuration conf,
+      byte[] tableName, String username, TablePermission perm)
+    throws IOException {
+
+    Put p = new Put(tableName);
+    byte[] key = Bytes.toBytes(username);
+    if (perm.getFamily() != null && perm.getFamily().length > 0) {
+      key = Bytes.add(key,
+          Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getFamily()));
+      if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+        key = Bytes.add(key,
+            Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getQualifier()));
+      }
+    }
+
+    TablePermission.Action[] actions = perm.getActions();
+    if ((actions == null) || (actions.length == 0)) {
+      LOG.warn("No actions associated with user '"+username+"'");
+      return;
+    }
+
+    byte[] value = new byte[actions.length];
+    for (int i = 0; i < actions.length; i++) {
+      value[i] = actions[i].code();
+    }
+    p.add(ACL_LIST_FAMILY, key, value);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing permission for table "+
+          Bytes.toString(tableName)+" "+
+          Bytes.toString(key)+": "+Bytes.toStringBinary(value)
+      );
+    }
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      acls.put(p);
+    } finally {
+      if (acls != null) acls.close();
+    }
+  }
+
+  /**
+   * Removes a previously granted permission from the stored access control
+   * lists.  The {@link TablePermission} being removed must exactly match what
+   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
+   * been granted "READ" access to the "data" table, but only to column family
+   * plus qualifier "info:colA", then trying to call this method with only
+   * user "bob" and the table name "data" (but without specifying the
+   * column qualifier "info:colA") will have no effect.
+   *
+   * @param conf the configuration
+   * @param tableName the table of the current permission grant
+   * @param userName the user or group currently granted the permission
+   * @param perm the details of the permission to be revoked
+   * @throws IOException if there is an error accessing the metadata table
+   */
+  static void removeTablePermission(Configuration conf,
+      byte[] tableName, String userName, TablePermission perm)
+    throws IOException {
+
+    Delete d = new Delete(tableName);
+    byte[] key = null;
+    if (perm.getFamily() != null && perm.getFamily().length > 0) {
+      key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()));
+      if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+       key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()) + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getQualifier()));
+      } else {
+        key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()));
+      }
+    } else {
+      key = Bytes.toBytes(userName);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing permission for user '" + userName+ "': "+
+          perm.toString());
+    }
+    d.deleteColumns(ACL_LIST_FAMILY, key);
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      acls.delete(d);
+    } finally {
+      if (acls != null) acls.close();
+    }
+  }
+
+  /**
+   * Returns {@code true} if the given region is part of the {@code _acl_}
+   * metadata table.
+   */
+  static boolean isAclRegion(HRegion region) {
+    return Bytes.equals(ACL_TABLE_NAME, region.getTableDesc().getName());
+  }
+
+  /**
+   * Loads all of the permission grants stored in a region of the {@code _acl_}
+   * table.
+   *
+   * @param aclRegion
+   * @return
+   * @throws IOException
+   */
+  static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+      HRegion aclRegion)
+    throws IOException {
+
+    if (!isAclRegion(aclRegion)) {
+      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME_STR);
+    }
+
+    Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+        new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+    
+    // do a full scan of _acl_ table
+
+    Scan scan = new Scan();
+    scan.addFamily(ACL_LIST_FAMILY);
+
+    InternalScanner iScanner = null;
+    try {
+      iScanner = aclRegion.getScanner(scan);
+
+      while (true) {
+        List<KeyValue> row = new ArrayList<KeyValue>();
+
+        boolean hasNext = iScanner.next(row);
+        ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+        byte[] table = null;
+        for (KeyValue kv : row) {
+          if (table == null) {
+            table = kv.getRow();
+          }
+          Pair<String,TablePermission> permissionsOfUserOnTable =
+              parseTablePermissionRecord(table, kv);
+          if (permissionsOfUserOnTable != null) {
+            String username = permissionsOfUserOnTable.getFirst();
+            TablePermission permissions = permissionsOfUserOnTable.getSecond();
+            perms.put(username, permissions);
+          }
+        }
+        if (table != null) {
+          allPerms.put(table, perms);
+        }
+        if (!hasNext) {
+          break;
+        }
+      }
+    } finally {
+      if (iScanner != null) {
+        iScanner.close();
+      }
+    }
+
+    return allPerms;
+  }
+
+  /**
+   * Load all permissions from the region server holding {@code _acl_},
+   * primarily intended for testing purposes.
+   */
+  static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+      Configuration conf) throws IOException {
+    Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+        new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+    // do a full scan of _acl_, filtering on only first table region rows
+
+    Scan scan = new Scan();
+    scan.addFamily(ACL_LIST_FAMILY);
+
+    HTable acls = null;
+    ResultScanner scanner = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      scanner = acls.getScanner(scan);
+      for (Result row : scanner) {
+        ListMultimap<String,TablePermission> resultPerms =
+            parseTablePermissions(row.getRow(), row);
+        allPerms.put(row.getRow(), resultPerms);
+      }
+    } finally {
+      if (scanner != null) scanner.close();
+      if (acls != null) acls.close();
+    }
+
+    return allPerms;
+  }
+
+  /**
+   * Reads user permission assignments stored in the <code>l:</code> column
+   * family of the first table row in <code>_acl_</code>.
+   *
+   * <p>
+   * See {@link AccessControlLists class documentation} for the key structure
+   * used for storage.
+   * </p>
+   */
+  static ListMultimap<String,TablePermission> getTablePermissions(
+      Configuration conf, byte[] tableName)
+  throws IOException {
+    /* TODO: -ROOT- and .META. cannot easily be handled because they must be
+     * online before _acl_ table.  Can anything be done here?
+     */
+    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME) ||
+        Bytes.equals(tableName, HConstants.META_TABLE_NAME) ||
+        Bytes.equals(tableName, AccessControlLists.ACL_TABLE_NAME)) {
+      return ArrayListMultimap.create(0,0);
+    }
+
+    // for normal user tables, we just read the table row from _acl_
+    ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      Get get = new Get(tableName);
+      get.addFamily(ACL_LIST_FAMILY);
+      Result row = acls.get(get);
+      if (!row.isEmpty()) {
+        perms = parseTablePermissions(tableName, row);
+      } else {
+        LOG.info("No permissions found in "+ACL_TABLE_NAME_STR+
+            " for table "+Bytes.toString(tableName));
+      }
+    } finally {
+      if (acls != null) acls.close();
+    }
+
+    return perms;
+  }
+
+  /**
+   * Returns the currently granted permissions for a given table as a list of
+   * user plus associated permissions.
+   */
+  static List<UserPermission> getUserPermissions(
+      Configuration conf, byte[] tableName)
+  throws IOException {
+    ListMultimap<String,TablePermission> allPerms = getTablePermissions(
+      conf, tableName);
+
+    List<UserPermission> perms = new ArrayList<UserPermission>();
+
+    for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
+      UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
+          entry.getValue().getTable(), entry.getValue().getFamily(),
+          entry.getValue().getQualifier(), entry.getValue().getActions());
+      perms.add(up);
+    }
+    return perms;
+  }
+
+  private static ListMultimap<String,TablePermission> parseTablePermissions(
+      byte[] table, Result result) {
+    ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+    if (result != null && result.size() > 0) {
+      for (KeyValue kv : result.raw()) {
+
+        Pair<String,TablePermission> permissionsOfUserOnTable =
+            parseTablePermissionRecord(table, kv);
+
+        if (permissionsOfUserOnTable != null) {
+          String username = permissionsOfUserOnTable.getFirst();
+          TablePermission permissions = permissionsOfUserOnTable.getSecond();
+          perms.put(username, permissions);
+        }
+      }
+    }
+    return perms;
+  }
+
+  private static Pair<String,TablePermission> parseTablePermissionRecord(
+      byte[] table, KeyValue kv) {
+    // return X given a set of permissions encoded in the permissionRecord kv.
+    byte[] family = kv.getFamily();
+
+    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
+      return null;
+    }
+
+    byte[] key = kv.getQualifier();
+    byte[] value = kv.getValue();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Read acl: kv ["+
+                Bytes.toStringBinary(key)+": "+
+                Bytes.toStringBinary(value)+"]");
+    }
+
+    // check for a column family appended to the key
+    // TODO: avoid the string conversion to make this more efficient
+    String username = Bytes.toString(key);
+    int idx = username.indexOf(ACL_KEY_DELIMITER);
+    byte[] permFamily = null;
+    byte[] permQualifier = null;
+    if (idx > 0 && idx < username.length()-1) {
+      String remainder = username.substring(idx+1);
+      username = username.substring(0, idx);
+      idx = remainder.indexOf(ACL_KEY_DELIMITER);
+      if (idx > 0 && idx < remainder.length()-1) {
+        permFamily = Bytes.toBytes(remainder.substring(0, idx));
+        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
+      } else {
+        permFamily = Bytes.toBytes(remainder);
+      }
+    }
+
+    return new Pair<String,TablePermission>(
+        username, new TablePermission(table, permFamily, permQualifier, value));
+  }
+
+  /**
+   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * to the given output stream.
+   * @param out
+   * @param perms
+   * @param conf
+   * @throws IOException
+   */
+  public static void writePermissions(DataOutput out,
+      ListMultimap<String,? extends Permission> perms, Configuration conf)
+  throws IOException {
+    Set<String> keys = perms.keySet();
+    out.writeInt(keys.size());
+    for (String key : keys) {
+      Text.writeString(out, key);
+      HbaseObjectWritable.writeObject(out, perms.get(key), List.class, conf);
+    }
+  }
+
+  /**
+   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * and returns the resulting byte array.
+   */
+  public static byte[] writePermissionsAsBytes(
+      ListMultimap<String,? extends Permission> perms, Configuration conf) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      writePermissions(new DataOutputStream(bos), perms, conf);
+      return bos.toByteArray();
+    } catch (IOException ioe) {
+      // shouldn't happen here
+      LOG.error("Error serializing permissions", ioe);
+    }
+    return null;
+  }
+
+  /**
+   * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * from the input stream.
+   */
+  public static <T extends Permission> ListMultimap<String,T> readPermissions(
+      DataInput in, Configuration conf) throws IOException {
+    ListMultimap<String,T> perms = ArrayListMultimap.create();
+    int length = in.readInt();
+    for (int i=0; i<length; i++) {
+      String user = Text.readString(in);
+      List<T> userPerms =
+          (List)HbaseObjectWritable.readObject(in, conf);
+      perms.putAll(user, userPerms);
+    }
+
+    return perms;
+  }
+
+  /**
+   * Returns whether or not the given name should be interpreted as a group
+   * principal.  Currently this simply checks if the name starts with the
+   * special group prefix character ("@").
+   */
+  public static boolean isGroupPrincipal(String name) {
+    return name != null && name.startsWith(GROUP_PREFIX);
+  }
+
+  /**
+   * Returns the actual name for a group principal (stripped of the
+   * group prefix).
+   */
+  public static String getGroupName(String aclKey) {
+    if (!isGroupPrincipal(aclKey)) {
+      return aclKey;
+    }
+
+    return aclKey.substring(GROUP_PREFIX.length());
+  }
+}

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1203915&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Sat Nov 19 00:25:37 2011
@@ -0,0 +1,987 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.ProtocolSignature;
+import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Provides basic authorization checks for data access and administrative
+ * operations.
+ *
+ * <p>
+ * {@code AccessController} performs authorization checks for HBase operations
+ * based on:
+ * <ul>
+ *   <li>the identity of the user performing the operation</li>
+ *   <li>the scope over which the operation is performed, in increasing
+ *   specificity: global, table, column family, or qualifier</li>
+ *   <li>the type of action being performed (as mapped to
+ *   {@link Permission.Action} values)</li>
+ * </ul>
+ * If the authorization check fails, an {@link AccessDeniedException}
+ * will be thrown for the operation.
+ * </p>
+ *
+ * <p>
+ * To perform authorization checks, {@code AccessController} relies on the
+ * {@link org.apache.hadoop.hbase.ipc.SecureRpcEngine} being loaded to provide
+ * the user identities for remote requests.
+ * </p>
+ *
+ * <p>
+ * The access control lists used for authorization can be manipulated via the
+ * exposed {@link AccessControllerProtocol} implementation, and the associated
+ * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
+ * commands.
+ * </p>
+ */
+public class AccessController extends BaseRegionObserver
+    implements MasterObserver, AccessControllerProtocol {
+  /**
+   * Represents the result of an authorization check for logging and error
+   * reporting.
+   */
+  private static class AuthResult {
+    private final boolean allowed;
+    private final byte[] table;
+    private final byte[] family;
+    private final byte[] qualifier;
+    private final Permission.Action action;
+    private final String reason;
+    private final User user;
+
+    public AuthResult(boolean allowed, String reason,  User user,
+        Permission.Action action, byte[] table, byte[] family, byte[] qualifier) {
+      this.allowed = allowed;
+      this.reason = reason;
+      this.user = user;
+      this.table = table;
+      this.family = family;
+      this.qualifier = qualifier;
+      this.action = action;
+    }
+
+    public boolean isAllowed() { return allowed; }
+
+    public User getUser() { return user; }
+
+    public String getReason() { return reason; }
+
+    public String toContextString() {
+      return "(user=" + (user != null ? user.getName() : "UNKNOWN") + ", " +
+          "scope=" + (table == null ? "GLOBAL" : Bytes.toString(table)) + ", " +
+          "family=" + (family != null ? Bytes.toString(family) : "") + ", " +
+          "qualifer=" + (qualifier != null ? Bytes.toString(qualifier) : "") + ", " +
+          "action=" + (action != null ? action.toString() : "") + ")";
+    }
+
+    public String toString() {
+      return new StringBuilder("AuthResult")
+          .append(toContextString()).toString();
+    }
+
+    public static AuthResult allow(String reason, User user,
+        Permission.Action action, byte[] table) {
+      return new AuthResult(true, reason, user, action, table, null, null);
+    }
+
+    public static AuthResult deny(String reason, User user,
+        Permission.Action action, byte[] table) {
+      return new AuthResult(false, reason, user, action, table, null, null);
+    }
+
+    public static AuthResult deny(String reason, User user,
+        Permission.Action action, byte[] table, byte[] family, byte[] qualifier) {
+      return new AuthResult(false, reason, user, action, table, family, qualifier);
+    }
+  }
+
+  public static final Log LOG = LogFactory.getLog(AccessController.class);
+
+  private static final Log AUDITLOG =
+    LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
+
+  /**
+   * Version number for AccessControllerProtocol
+   */
+  private static final long PROTOCOL_VERSION = 1L;
+
+  TableAuthManager authManager = null;
+
+  // flags if we are running on a region of the _acl_ table
+  boolean aclRegion = false;
+
+  // defined only for Endpoint implementation, so it can have way to
+  // access region services.
+  private RegionCoprocessorEnvironment regionEnv;
+
+  /** Mapping of scanner instances to the user who created them */
+  private Map<InternalScanner,String> scannerOwners =
+      new MapMaker().weakKeys().makeMap();
+
+  void initialize(RegionCoprocessorEnvironment e) throws IOException {
+    final HRegion region = e.getRegion();
+
+    Map<byte[],ListMultimap<String,TablePermission>> tables =
+        AccessControlLists.loadAll(region);
+    // For each table, write out the table's permissions to the respective
+    // znode for that table.
+    for (Map.Entry<byte[],ListMultimap<String,TablePermission>> t:
+      tables.entrySet()) {
+      byte[] table = t.getKey();
+      String tableName = Bytes.toString(table);
+      ListMultimap<String,TablePermission> perms = t.getValue();
+      byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms,
+          e.getRegion().getConf());
+      this.authManager.getZKPermissionWatcher().writeToZookeeper(tableName,
+        serialized);
+    }
+  }
+
+  /**
+   * Writes all table ACLs for the tables in the given Map up into ZooKeeper
+   * znodes.  This is called to synchronize ACL changes following {@code _acl_}
+   * table updates.
+   */
+  void updateACL(RegionCoprocessorEnvironment e,
+      final Map<byte[], List<KeyValue>> familyMap) {
+    Set<String> tableSet = new HashSet<String>();
+    for (Map.Entry<byte[], List<KeyValue>> f : familyMap.entrySet()) {
+      List<KeyValue> kvs = f.getValue();
+      for (KeyValue kv: kvs) {
+        if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(),
+            kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
+            AccessControlLists.ACL_LIST_FAMILY.length) == 0) {
+          String tableName = Bytes.toString(kv.getRow());
+          tableSet.add(tableName);
+        }
+      }
+    }
+
+    for (String tableName: tableSet) {
+      try {
+        ListMultimap<String,TablePermission> perms =
+          AccessControlLists.getTablePermissions(regionEnv.getConfiguration(),
+              Bytes.toBytes(tableName));
+        byte[] serialized = AccessControlLists.writePermissionsAsBytes(
+            perms, e.getRegion().getConf());
+        this.authManager.getZKPermissionWatcher().writeToZookeeper(tableName,
+          serialized);
+      } catch (IOException ex) {
+        LOG.error("Failed updating permissions mirror for '" + tableName +
+          "'", ex);
+      }
+    }
+  }
+
+  /**
+   * Check the current user for authorization to perform a specific action
+   * against the given set of row data.
+   *
+   * <p>Note: Ordering of the authorization checks
+   * has been carefully optimized to short-circuit the most common requests
+   * and minimize the amount of processing required.</p>
+   *
+   * @param permRequest the action being requested
+   * @param e the coprocessor environment
+   * @param families the map of column families to qualifiers present in
+   * the request
+   * @return
+   */
+  AuthResult permissionGranted(User user, TablePermission.Action permRequest,
+      RegionCoprocessorEnvironment e,
+      Map<byte [], ? extends Collection<?>> families) {
+    HRegionInfo hri = e.getRegion().getRegionInfo();
+    HTableDescriptor htd = e.getRegion().getTableDesc();
+    byte[] tableName = hri.getTableName();
+
+    // 1. All users need read access to .META. and -ROOT- tables.
+    // this is a very common operation, so deal with it quickly.
+    if ((hri.isRootRegion() || hri.isMetaRegion()) &&
+        (permRequest == TablePermission.Action.READ)) {
+      return AuthResult.allow("All users allowed", user, permRequest,
+          hri.getTableName());
+    }
+
+    if (user == null) {
+      return AuthResult.deny("No user associated with request!", null,
+          permRequest, hri.getTableName());
+    }
+
+    // 2. The table owner has full privileges
+    String owner = htd.getOwnerString();
+    if (user.getShortName().equals(owner)) {
+      // owner of the table has full access
+      return AuthResult.allow("User is table owner", user, permRequest,
+          hri.getTableName());
+    }
+
+    // 3. check for the table-level, if successful we can short-circuit
+    if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
+      return AuthResult.allow("Table permission granted", user,
+          permRequest, tableName);
+    }
+
+    // 4. check permissions against the requested families
+    if (families != null && families.size() > 0) {
+      // all families must pass
+      for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
+        // a) check for family level access
+        if (authManager.authorize(user, tableName, family.getKey(),
+            permRequest)) {
+          continue;  // family-level permission overrides per-qualifier
+        }
+
+        // b) qualifier level access can still succeed
+        if ((family.getValue() != null) && (family.getValue().size() > 0)) {
+          if (family.getValue() instanceof Set) {
+            // for each qualifier of the family
+            Set<byte[]> familySet = (Set<byte[]>)family.getValue();
+            for (byte[] qualifier : familySet) {
+              if (!authManager.authorize(user, tableName, family.getKey(),
+                                         qualifier, permRequest)) {
+                return AuthResult.deny("Failed qualifier check", user,
+                    permRequest, tableName, family.getKey(), qualifier);
+              }
+            }
+          } else if (family.getValue() instanceof List) { // List<KeyValue>
+            List<KeyValue> kvList = (List<KeyValue>)family.getValue();
+            for (KeyValue kv : kvList) {
+              if (!authManager.authorize(user, tableName, family.getKey(),
+                      kv.getQualifier(), permRequest)) {
+                return AuthResult.deny("Failed qualifier check", user,
+                    permRequest, tableName, family.getKey(), kv.getQualifier());
+              }
+            }
+          }
+        } else {
+          // no qualifiers and family-level check already failed
+          return AuthResult.deny("Failed family check", user, permRequest,
+              tableName, family.getKey(), null);
+        }
+      }
+
+      // all family checks passed
+      return AuthResult.allow("All family checks passed", user, permRequest,
+          tableName);
+    }
+
+    // 5. no families to check and table level access failed
+    return AuthResult.deny("No families to check and table permission failed",
+        user, permRequest, tableName);
+  }
+
+  private void logResult(AuthResult result) {
+    if (AUDITLOG.isTraceEnabled()) {
+      AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
+          " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
+          "; reason: " + result.getReason() +
+          "; context: " + result.toContextString());
+    }
+  }
+
+  /**
+   * Returns the active user to which authorization checks should be applied.
+   * If we are in the context of an RPC call, the remote user is used,
+   * otherwise the currently logged in user is used.
+   */
+  private User getActiveUser() throws IOException {
+    User user = RequestContext.getRequestUser();
+    if (!RequestContext.isInRequestContext()) {
+      // for non-rpc handling, fallback to system user
+      user = User.getCurrent();
+    }
+    return user;
+  }
+
+  /**
+   * Authorizes that the current user has global privileges for the given action.
+   * @param perm The action being requested
+   * @throws IOException if obtaining the current user fails
+   * @throws AccessDeniedException if authorization is denied
+   */
+  private void requirePermission(Permission.Action perm) throws IOException {
+    User user = getActiveUser();
+    if (authManager.authorize(user, perm)) {
+      logResult(AuthResult.allow("Global check allowed", user, perm, null));
+    } else {
+      logResult(AuthResult.deny("Global check failed", user, perm, null));
+      throw new AccessDeniedException("Insufficient permissions for user '" +
+          (user != null ? user.getShortName() : "null") +"' (global, action=" +
+          perm.toString() + ")");
+    }
+  }
+
+  /**
+   * Authorizes that the current user has permission to perform the given
+   * action on the set of table column families.
+   * @param perm Action that is required
+   * @param env The current coprocessor environment
+   * @param families The set of column families present/required in the request
+   * @throws AccessDeniedException if the authorization check failed
+   */
+  private void requirePermission(Permission.Action perm,
+        RegionCoprocessorEnvironment env, Collection<byte[]> families)
+      throws IOException {
+    // create a map of family-qualifier
+    HashMap<byte[], Set<byte[]>> familyMap = new HashMap<byte[], Set<byte[]>>();
+    for (byte[] family : families) {
+      familyMap.put(family, null);
+    }
+    requirePermission(perm, env, familyMap);
+  }
+
+  /**
+   * Authorizes that the current user has permission to perform the given
+   * action on the set of table column families.
+   * @param perm Action that is required
+   * @param env The current coprocessor environment
+   * @param families The map of column families-qualifiers.
+   * @throws AccessDeniedException if the authorization check failed
+   */
+  private void requirePermission(Permission.Action perm,
+        RegionCoprocessorEnvironment env,
+        Map<byte[], ? extends Collection<?>> families)
+      throws IOException {
+    User user = getActiveUser();
+    AuthResult result = permissionGranted(user, perm, env, families);
+    logResult(result);
+
+    if (!result.isAllowed()) {
+      StringBuffer sb = new StringBuffer("");
+      if ((families != null && families.size() > 0)) {
+        for (byte[] familyName : families.keySet()) {
+          if (sb.length() != 0) {
+            sb.append(", ");
+          }
+          sb.append(Bytes.toString(familyName));
+        }
+      }
+      throw new AccessDeniedException("Insufficient permissions (table=" +
+        env.getRegion().getTableDesc().getNameAsString()+
+        ((families != null && families.size() > 0) ? ", family: " +
+        sb.toString() : "") + ", action=" +
+        perm.toString() + ")");
+    }
+  }
+
+  /**
+   * Returns <code>true</code> if the current user is allowed the given action
+   * over at least one of the column qualifiers in the given column families.
+   */
+  private boolean hasFamilyQualifierPermission(User user,
+      TablePermission.Action perm,
+      RegionCoprocessorEnvironment env,
+      Map<byte[], ? extends Set<byte[]>> familyMap)
+    throws IOException {
+    HRegionInfo hri = env.getRegion().getRegionInfo();
+    byte[] tableName = hri.getTableName();
+
+    if (user == null) {
+      return false;
+    }
+
+    if (familyMap != null && familyMap.size() > 0) {
+      // at least one family must be allowed
+      for (Map.Entry<byte[], ? extends Set<byte[]>> family :
+          familyMap.entrySet()) {
+        if (family.getValue() != null && !family.getValue().isEmpty()) {
+          for (byte[] qualifier : family.getValue()) {
+            if (authManager.matchPermission(user, tableName,
+                family.getKey(), qualifier, perm)) {
+              return true;
+            }
+          }
+        } else {
+          if (authManager.matchPermission(user, tableName, family.getKey(),
+              perm)) {
+            return true;
+          }
+        }
+      }
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Empty family map passed for permission check");
+    }
+
+    return false;
+  }
+
+  /* ---- MasterObserver implementation ---- */
+  public void start(CoprocessorEnvironment env) throws IOException {
+    // if running on HMaster
+    if (env instanceof MasterCoprocessorEnvironment) {
+      MasterCoprocessorEnvironment e = (MasterCoprocessorEnvironment)env;
+      this.authManager = TableAuthManager.get(
+          e.getMasterServices().getZooKeeper(),
+          e.getConfiguration());
+    }
+
+    // if running at region
+    if (env instanceof RegionCoprocessorEnvironment) {
+      regionEnv = (RegionCoprocessorEnvironment)env;
+    }
+  }
+
+  public void stop(CoprocessorEnvironment env) {
+
+  }
+
+  @Override
+  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+
+    // default the table owner if not specified
+    User owner = getActiveUser();
+    if (desc.getOwnerString() == null ||
+        desc.getOwnerString().equals("")) {
+      desc.setOwner(owner);
+    }
+  }
+
+  @Override
+  public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
+
+  @Override
+  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+  }
+  @Override
+  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {}
+
+
+  @Override
+  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HTableDescriptor htd) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+  }
+  @Override
+  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HTableDescriptor htd) throws IOException {}
+
+
+  @Override
+  public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HColumnDescriptor column) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+  }
+  @Override
+  public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HColumnDescriptor column) throws IOException {}
+
+
+  @Override
+  public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HColumnDescriptor descriptor) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+  }
+  @Override
+  public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, HColumnDescriptor descriptor) throws IOException {}
+
+
+  @Override
+  public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, byte[] col) throws IOException {
+    requirePermission(Permission.Action.CREATE);
+  }
+  @Override
+  public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName, byte[] col) throws IOException {}
+
+
+  @Override
+  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {
+    /* TODO: Allow for users with global CREATE permission and the table owner */
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {}
+
+  @Override
+  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {
+    /* TODO: Allow for users with global CREATE permission and the table owner */
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      byte[] tableName) throws IOException {}
+
+  @Override
+  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c,
+      HRegionInfo region, ServerName srcServer, ServerName destServer)
+    throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
+      HRegionInfo region, ServerName srcServer, ServerName destServer)
+    throws IOException {}
+
+  @Override
+  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c,
+      HRegionInfo regionInfo) throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
+      HRegionInfo regionInfo) throws IOException {}
+
+  @Override
+  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
+       HRegionInfo regionInfo, boolean force) throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
+      HRegionInfo regionInfo, boolean force) throws IOException {}
+
+  @Override
+  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
+      throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+  @Override
+  public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c)
+      throws IOException {}
+
+  @Override
+  public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
+      boolean newValue) throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+    return newValue;
+  }
+  @Override
+  public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
+      boolean oldValue, boolean newValue) throws IOException {}
+
+  @Override
+  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
+      throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+
+  @Override
+  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
+      throws IOException {
+    requirePermission(Permission.Action.ADMIN);
+  }
+
+  @Override
+  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
+      throws IOException {
+    // initialize the ACL storage table
+    AccessControlLists.init(ctx.getEnvironment().getMasterServices());
+  }
+
+
+  /* ---- RegionObserver implementation ---- */
+
+  @Override
+  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
+    RegionCoprocessorEnvironment e = c.getEnvironment();
+    final HRegion region = e.getRegion();
+    if (region == null) {
+      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
+      return;
+    }
+
+    try {
+      this.authManager = TableAuthManager.get(
+          e.getRegionServerServices().getZooKeeper(),
+          e.getRegion().getConf());
+    } catch (IOException ioe) {
+      // pass along as a RuntimeException, so that the coprocessor is unloaded
+      throw new RuntimeException("Error obtaining TableAuthManager", ioe);
+    }
+
+    if (AccessControlLists.isAclRegion(region)) {
+      aclRegion = true;
+      try {
+        initialize(e);
+      } catch (IOException ex) {
+        // if we can't obtain permissions, it's better to fail
+        // than perform checks incorrectly
+        throw new RuntimeException("Failed to initialize permissions cache", ex);
+      }
+    }
+  }
+
+  @Override
+  public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final byte [] row, final byte [] family, final Result result)
+      throws IOException {
+    requirePermission(TablePermission.Action.READ, c.getEnvironment(),
+        (family != null ? Lists.newArrayList(family) : null));
+  }
+
+  @Override
+  public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Get get, final List<KeyValue> result) throws IOException {
+    /*
+     if column family level checks fail, check for a qualifier level permission
+     in one of the families.  If it is present, then continue with the AccessControlFilter.
+      */
+    RegionCoprocessorEnvironment e = c.getEnvironment();
+    User requestUser = getActiveUser();
+    AuthResult authResult = permissionGranted(requestUser,
+        TablePermission.Action.READ, e, get.getFamilyMap());
+    if (!authResult.isAllowed()) {
+      if (hasFamilyQualifierPermission(requestUser,
+          TablePermission.Action.READ, e, get.getFamilyMap())) {
+        byte[] table = getTableName(e);
+        AccessControlFilter filter = new AccessControlFilter(authManager,
+            requestUser, table);
+
+        // wrap any existing filter
+        if (get.getFilter() != null) {
+          FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+              Lists.newArrayList(filter, get.getFilter()));
+          get.setFilter(wrapper);
+        } else {
+          get.setFilter(filter);
+        }
+        logResult(AuthResult.allow("Access allowed with filter", requestUser,
+            TablePermission.Action.READ, authResult.table));
+      } else {
+        logResult(authResult);
+        throw new AccessDeniedException("Insufficient permissions (table=" +
+          e.getRegion().getTableDesc().getNameAsString() + ", action=READ)");
+      }
+    } else {
+      // log auth success
+      logResult(authResult);
+    }
+  }
+
+  @Override
+  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Get get, final boolean exists) throws IOException {
+    requirePermission(TablePermission.Action.READ, c.getEnvironment(),
+        get.familySet());
+    return exists;
+  }
+
+  @Override
+  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Put put, final WALEdit edit, final boolean writeToWAL)
+      throws IOException {
+    requirePermission(TablePermission.Action.WRITE, c.getEnvironment(),
+        put.getFamilyMap());
+  }
+
+  @Override
+  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Put put, final WALEdit edit, final boolean writeToWAL) {
+    if (aclRegion) {
+      updateACL(c.getEnvironment(), put.getFamilyMap());
+    }
+  }
+
+  @Override
+  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Delete delete, final WALEdit edit, final boolean writeToWAL)
+      throws IOException {
+    requirePermission(TablePermission.Action.WRITE, c.getEnvironment(),
+        delete.getFamilyMap());
+  }
+
+  @Override
+  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Delete delete, final WALEdit edit, final boolean writeToWAL)
+      throws IOException {
+    if (aclRegion) {
+      updateACL(c.getEnvironment(), delete.getFamilyMap());
+    }
+  }
+
+  @Override
+  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final byte [] row, final byte [] family, final byte [] qualifier,
+      final CompareFilter.CompareOp compareOp,
+      final WritableByteArrayComparable comparator, final Put put,
+      final boolean result) throws IOException {
+    requirePermission(TablePermission.Action.READ, c.getEnvironment(),
+        Arrays.asList(new byte[][]{family}));
+    return result;
+  }
+
+  @Override
+  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final byte [] row, final byte [] family, final byte [] qualifier,
+      final CompareFilter.CompareOp compareOp,
+      final WritableByteArrayComparable comparator, final Delete delete,
+      final boolean result) throws IOException {
+    requirePermission(TablePermission.Action.READ, c.getEnvironment(),
+        Arrays.asList( new byte[][] {family}));
+    return result;
+  }
+
+  @Override
+  public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final byte [] row, final byte [] family, final byte [] qualifier,
+      final long amount, final boolean writeToWAL)
+      throws IOException {
+    requirePermission(TablePermission.Action.WRITE, c.getEnvironment(),
+        Arrays.asList(new byte[][]{family}));
+    return -1;
+  }
+
+  @Override
+  public void preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Increment increment, final Result result)
+      throws IOException {
+    requirePermission(TablePermission.Action.WRITE, c.getEnvironment(),
+        increment.getFamilyMap().keySet());
+  }
+
+  @Override
+  public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Scan scan, final RegionScanner s) throws IOException {
+    /*
+     if column family level checks fail, check for a qualifier level permission
+     in one of the families.  If it is present, then continue with the AccessControlFilter.
+      */
+    RegionCoprocessorEnvironment e = c.getEnvironment();
+    User user = getActiveUser();
+    AuthResult authResult = permissionGranted(user, TablePermission.Action.READ, e,
+        scan.getFamilyMap());
+    if (!authResult.isAllowed()) {
+      if (hasFamilyQualifierPermission(user, TablePermission.Action.READ, e,
+          scan.getFamilyMap())) {
+        byte[] table = getTableName(e);
+        AccessControlFilter filter = new AccessControlFilter(authManager,
+            user, table);
+
+        // wrap any existing filter
+        if (scan.hasFilter()) {
+          FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+              Lists.newArrayList(filter, scan.getFilter()));
+          scan.setFilter(wrapper);
+        } else {
+          scan.setFilter(filter);
+        }
+        logResult(AuthResult.allow("Access allowed with filter", user,
+            TablePermission.Action.READ, authResult.table));
+      } else {
+        // no table/family level perms and no qualifier level perms, reject
+        logResult(authResult);
+        throw new AccessDeniedException("Insufficient permissions for user '"+
+            (user != null ? user.getShortName() : "null")+"' "+
+            "for scanner open on table " + Bytes.toString(getTableName(e)));
+      }
+    } else {
+      // log success
+      logResult(authResult);
+    }
+    return s;
+  }
+
+  @Override
+  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Scan scan, final RegionScanner s) throws IOException {
+    User user = getActiveUser();
+    if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
+      scannerOwners.put(s, user.getShortName());
+    }
+    return s;
+  }
+
+  @Override
+  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final InternalScanner s, final List<Result> result,
+      final int limit, final boolean hasNext) throws IOException {
+    requireScannerOwner(s);
+    return hasNext;
+  }
+
+  @Override
+  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final InternalScanner s) throws IOException {
+    requireScannerOwner(s);
+  }
+
+  @Override
+  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final InternalScanner s) throws IOException {
+    // clean up any associated owner mapping
+    scannerOwners.remove(s);
+  }
+
+  /**
+   * Verify, when servicing an RPC, that the caller is the scanner owner.
+   * If so, we assume that access control is correctly enforced based on
+   * the checks performed in preScannerOpen()
+   */
+  private void requireScannerOwner(InternalScanner s)
+      throws AccessDeniedException {
+    if (RequestContext.isInRequestContext()) {
+      String owner = scannerOwners.get(s);
+      if (owner != null && !owner.equals(RequestContext.getRequestUserName())) {
+        throw new AccessDeniedException("User '"+
+            RequestContext.getRequestUserName()+"' is not the scanner owner!");
+      }
+    }
+  }
+
+  /* ---- AccessControllerProtocol implementation ---- */
+  /*
+   * These methods are only allowed to be called against the _acl_ region(s).
+   * This will be restricted by both client side and endpoint implementations.
+   */
+  @Override
+  public void grant(byte[] user, TablePermission permission)
+      throws IOException {
+    // verify it's only running at .acl.
+    if (aclRegion) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received request to grant access permission to '"
+            + Bytes.toString(user) + "'. "
+            + permission.toString());
+      }
+
+      requirePermission(Permission.Action.ADMIN);
+
+      AccessControlLists.addTablePermission(regionEnv.getConfiguration(),
+          permission.getTable(), Bytes.toString(user), permission);
+      if (AUDITLOG.isTraceEnabled()) {
+        // audit log should store permission changes in addition to auth results
+        AUDITLOG.trace("Granted user '" + Bytes.toString(user) + "' permission "
+            + permission.toString());
+      }
+    } else {
+      throw new CoprocessorException(AccessController.class, "This method " +
+          "can only execute at " +
+          Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
+    }
+  }
+
+  @Override
+  public void revoke(byte[] user, TablePermission permission)
+      throws IOException{
+    // only allowed to be called on _acl_ region
+    if (aclRegion) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received request to revoke access permission for '"
+            + Bytes.toString(user) + "'. "
+            + permission.toString());
+      }
+
+      requirePermission(Permission.Action.ADMIN);
+
+      AccessControlLists.removeTablePermission(regionEnv.getConfiguration(),
+          permission.getTable(), Bytes.toString(user), permission);
+      if (AUDITLOG.isTraceEnabled()) {
+        // audit log should record all permission changes
+        AUDITLOG.trace("Revoked user '" + Bytes.toString(user) + "' permission "
+            + permission.toString());
+      }
+    } else {
+      throw new CoprocessorException(AccessController.class, "This method " +
+          "can only execute at " +
+          Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
+    }
+  }
+
+  @Override
+  public List<UserPermission> getUserPermissions(final byte[] tableName)
+      throws IOException {
+    // only allowed to be called on _acl_ region
+    if (aclRegion) {
+      requirePermission(Permission.Action.ADMIN);
+
+      List<UserPermission> perms = AccessControlLists.getUserPermissions
+          (regionEnv.getConfiguration(), tableName);
+      return perms;
+    } else {
+      throw new CoprocessorException(AccessController.class, "This method " +
+          "can only execute at " +
+          Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+    return PROTOCOL_VERSION;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    if (AccessControllerProtocol.class.getName().equals(protocol)) {
+      return new ProtocolSignature(PROTOCOL_VERSION, null);
+    }
+    throw new HBaseRPC.UnknownProtocolException(
+        "Unexpected protocol requested: "+protocol);
+  }
+
+  private byte[] getTableName(RegionCoprocessorEnvironment e) {
+    HRegion region = e.getRegion();
+    byte[] tableName = null;
+
+    if (region != null) {
+      HRegionInfo regionInfo = region.getRegionInfo();
+      if (regionInfo != null) {
+        tableName = regionInfo.getTableName();
+      }
+    }
+    return tableName;
+  }
+}

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java?rev=1203915&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java Sat Nov 19 00:25:37 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A custom protocol defined for maintaining and querying access control lists.
+ */
+public interface AccessControllerProtocol extends CoprocessorProtocol {
+  /**
+   * Grants the given user or group the privilege to perform the given actions
+   * over the specified scope contained in {@link TablePermission}
+   * @param user the user name, or, if prefixed with "@", group name receiving
+   * the grant
+   * @param permission the details of the provided permissions
+   * @throws IOException if the grant could not be applied
+   */
+  public void grant(byte[] user, TablePermission permission)
+      throws IOException;
+
+  /**
+   * Revokes a previously granted privilege from a user or group.
+   * Note that the provided {@link TablePermission} details must exactly match
+   * a stored grant.  For example, if user "bob" has been granted "READ" access
+   * to table "data", over column family and qualifer "info:colA", then the
+   * table, column family and column qualifier must all be specified.
+   * Attempting to revoke permissions over just the "data" table will have
+   * no effect.
+   * @param user the user name, or, if prefixed with "@", group name whose
+   * privileges are being revoked
+   * @param permission the details of the previously granted permission to revoke
+   * @throws IOException if the revocation could not be performed
+   */
+  public void revoke(byte[] user, TablePermission permission)
+      throws IOException;
+
+  /**
+   * Queries the permissions currently stored for the given table, returning
+   * a list of currently granted permissions, along with the user or group
+   * each is associated with.
+   * @param tableName the table of the permission grants to return
+   * @return a list of the currently granted permissions, with associated user
+   * or group names
+   * @throws IOException if there is an error querying the permissions
+   */
+  public List<UserPermission> getUserPermissions(byte[] tableName)
+      throws IOException;
+}

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java?rev=1203915&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java Sat Nov 19 00:25:37 2011
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.VersionedWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Base permissions instance representing the ability to perform a given set
+ * of actions.
+ *
+ * @see TablePermission
+ */
+public class Permission extends VersionedWritable {
+  protected static final byte VERSION = 0;
+  public enum Action {
+    READ('R'), WRITE('W'), EXEC('X'), CREATE('C'), ADMIN('A');
+
+    private byte code;
+    Action(char code) {
+      this.code = (byte)code;
+    }
+
+    public byte code() { return code; }
+  }
+
+  private static Log LOG = LogFactory.getLog(Permission.class);
+  protected static Map<Byte,Action> ACTION_BY_CODE = Maps.newHashMap();
+
+  protected Action[] actions;
+
+  static {
+    for (Action a : Action.values()) {
+      ACTION_BY_CODE.put(a.code(), a);
+    }
+  }
+
+  /** Empty constructor for Writable implementation.  <b>Do not use.</b> */
+  public Permission() {
+    super();
+  }
+
+  public Permission(Action... assigned) {
+    if (assigned != null && assigned.length > 0) {
+      actions = Arrays.copyOf(assigned, assigned.length);
+    }
+  }
+
+  public Permission(byte[] actionCodes) {
+    if (actionCodes != null) {
+      Action acts[] = new Action[actionCodes.length];
+      int j = 0;
+      for (int i=0; i<actionCodes.length; i++) {
+        byte b = actionCodes[i];
+        Action a = ACTION_BY_CODE.get(b);
+        if (a == null) {
+          LOG.error("Ignoring unknown action code '"+
+              Bytes.toStringBinary(new byte[]{b})+"'");
+          continue;
+        }
+        acts[j++] = a;
+      }
+      this.actions = Arrays.copyOf(acts, j);
+    }
+  }
+
+  public Action[] getActions() {
+    return actions;
+  }
+
+  public boolean implies(Action action) {
+    if (this.actions != null) {
+      for (Action a : this.actions) {
+        if (a == action) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Permission)) {
+      return false;
+    }
+    Permission other = (Permission)obj;
+    // check actions
+    if (actions == null && other.getActions() == null) {
+      return true;
+    } else if (actions != null && other.getActions() != null) {
+      Action[] otherActions = other.getActions();
+      if (actions.length != otherActions.length) {
+        return false;
+      }
+
+      outer:
+      for (Action a : actions) {
+        for (Action oa : otherActions) {
+          if (a == oa) continue outer;
+        }
+        return false;
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 37;
+    int result = 23;
+    for (Action a : actions) {
+      result = prime * result + a.code();
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder str = new StringBuilder("[Permission: ")
+        .append("actions=");
+    if (actions != null) {
+      for (int i=0; i<actions.length; i++) {
+        if (i > 0)
+          str.append(",");
+        if (actions[i] != null)
+          str.append(actions[i].toString());
+        else
+          str.append("NULL");
+      }
+    }
+    str.append("]");
+
+    return str.toString();
+  }
+
+  /** @return the object version number */
+  public byte getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int length = (int)in.readByte();
+    if (length > 0) {
+      actions = new Action[length];
+      for (int i = 0; i < length; i++) {
+        byte b = in.readByte();
+        Action a = ACTION_BY_CODE.get(b);
+        if (a == null) {
+          throw new IOException("Unknown action code '"+
+              Bytes.toStringBinary(new byte[]{b})+"' in input");
+        }
+        this.actions[i] = a;
+      }
+    } else {
+      actions = new Action[0];
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeByte(actions != null ? actions.length : 0);
+    if (actions != null) {
+      for (Action a: actions) {
+        out.writeByte(a.code());
+      }
+    }
+  }
+}



Mime
View raw message