hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r1545882 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ hbase-server/src/m...
Date Tue, 26 Nov 2013 23:33:31 GMT
Author: apurtell
Date: Tue Nov 26 23:33:31 2013
New Revision: 1545882

URL: http://svn.apache.org/r1545882
Log:
HBASE-7662. [Per-KV security] Per cell ACLs stored in tags

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlConstants.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java Tue Nov 26 23:33:31 2013
@@ -69,7 +69,6 @@ public class Get extends Query
   private boolean cacheBlocks = true;
   private int storeLimit = -1;
   private int storeOffset = 0;
-  private Filter filter = null;
   private TimeRange tr = new TimeRange();
   private boolean checkExistenceOnly = false;
   private boolean closestRowBefore = false;
@@ -210,28 +209,15 @@ public class Get extends Query
     return this;
   }
 
-  /**
-   * Apply the specified server-side filter when performing the Get.
-   * Only {@link Filter#filterKeyValue(Cell)} is called AFTER all tests
-   * for ttl, column match, deletes and max versions have been run.
-   * @param filter filter to run on the server
-   * @return this for invocation chaining
-   */
+  @Override
   public Get setFilter(Filter filter) {
-    this.filter = filter;
+    super.setFilter(filter);
     return this;
   }
 
   /* Accessors */
 
   /**
-   * @return Filter
-   */
-  public Filter getFilter() {
-    return this.filter;
-  }
-
-  /**
    * Set whether blocks should be cached for this Get.
    * <p>
    * This is true by default.  When true, default settings of the table and
@@ -429,4 +415,4 @@ public class Get extends Query
     // TODO: This is wrong.  Can't have two gets the same just because on same row.
     return compareTo(other) == 0;
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Tue Nov 26 23:33:31 2013
@@ -41,11 +41,15 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.security.access.AccessControlConstants;
+import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteArrayDataOutput;
@@ -382,6 +386,57 @@ public abstract class Mutation extends O
   }
 
   /**
+   * @return The serialized ACL for this operation, or null if none
+   */
+  public byte[] getACL() {
+    return getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+  }
+
+  /**
+   * @param user User short name
+   * @param perm Permissions for the user
+   */
+  public void setACL(String user, Permission perms) {
+    setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
+      ProtobufUtil.toUsersAndPermissions(user, perms).toByteArray());
+  }
+
+  /**
+   * @param perms A map of permissions for a user or users
+   */
+  public void setACL(Map<String, Permission> perms) {
+    ListMultimap<String, Permission> permMap = ArrayListMultimap.create();
+    for (Map.Entry<String, Permission> entry : perms.entrySet()) {
+      permMap.put(entry.getKey(), entry.getValue());
+    }
+    setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
+      ProtobufUtil.toUsersAndPermissions(permMap).toByteArray());
+  }
+
+  /**
+   * @return true if ACLs should be evaluated on the cell level first
+   */
+  public boolean getACLStrategy() {
+    byte[] bytes = getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY);
+    if (bytes != null) {
+      return Bytes.equals(bytes, AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY_CELL_FIRST);
+    }
+    return false;
+  }
+
+  /**
+   * @param cellFirstStrategy true if ACLs should be evaluated on the cell
+   * level first, false if ACL should first be checked at the CF and table
+   * levels
+   */
+  public void setACLStrategy(boolean cellFirstStrategy) {
+    if (cellFirstStrategy) {
+      setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY,
+        AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY_CELL_FIRST);
+    }
+  }
+
+  /**
    * Subclasses should override this method to add the heap size of their own fields.
    * @return the heap size to add (will be aligned).
    */

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java Tue Nov 26 23:33:31 2013
@@ -17,16 +17,45 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.security.access.AccessControlConstants;
+import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.visibility.Authorizations;
 import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class Query extends OperationWithAttributes {
+  protected Filter filter = null;
+
+  /**
+   * @return Filter
+   */
+  public Filter getFilter() {
+    return filter;
+  }
+
+  /**
+   * Apply the specified server-side filter when performing the Query.
+   * Only {@link Filter#filterKeyValue(Cell)} is called AFTER all tests
+   * for ttl, column match, deletes and max versions have been run.
+   * @param filter filter to run on the server
+   * @return this for invocation chaining
+   */
+  public Query setFilter(Filter filter) {
+    this.filter = filter;
+    return this;
+  }
 
   /**
    * Sets the authorizations to be used by this Query
@@ -46,4 +75,55 @@ public abstract class Query extends Oper
     if (authorizationsBytes == null) return null;
     return ProtobufUtil.toAuthorizations(authorizationsBytes);
   }
+
+  /**
+   * @return The serialized ACL for this operation, or null if none
+   */
+  public byte[] getACL() {
+    return getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+  }
+
+  /**
+   * @param user User short name
+   * @param perm Permissions for the user
+   */
+  public void setACL(String user, Permission perms) {
+    setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
+      ProtobufUtil.toUsersAndPermissions(user, perms).toByteArray());
+  }
+
+  /**
+   * @param perms A map of permissions for a user or users
+   */
+  public void setACL(Map<String, Permission> perms) {
+    ListMultimap<String, Permission> permMap = ArrayListMultimap.create();
+    for (Map.Entry<String, Permission> entry : perms.entrySet()) {
+      permMap.put(entry.getKey(), entry.getValue());
+    }
+    setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL,
+      ProtobufUtil.toUsersAndPermissions(permMap).toByteArray());
+  }
+
+  /**
+   * @return true if ACLs should be evaluated on the cell level first
+   */
+  public boolean getACLStrategy() {
+    byte[] bytes = getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY);
+    if (bytes != null) {
+      return Bytes.equals(bytes, AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY_CELL_FIRST);
+    }
+    return false;
+  }
+
+  /**
+   * @param cellFirstStrategy true if ACLs should be evaluated on the cell
+   * level first, false if ACL should first be checked at the CF and table
+   * levels
+   */
+  public void setACLStrategy(boolean cellFirstStrategy) {
+    if (cellFirstStrategy) {
+      setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY,
+        AccessControlConstants.OP_ATTRIBUTE_ACL_STRATEGY_CELL_FIRST);
+    }
+  }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java Tue Nov 26 23:33:31 2013
@@ -110,7 +110,6 @@ public class Scan extends Query {
   private int caching = -1;
   private long maxResultSize = -1;
   private boolean cacheBlocks = true;
-  private Filter filter = null;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
@@ -404,13 +403,9 @@ public class Scan extends Query {
     this.maxResultSize = maxResultSize;
   }
 
-  /**
-   * Apply the specified server-side filter when performing the Scan.
-   * @param filter filter to run on the server
-   * @return this
-   */
+  @Override
   public Scan setFilter(Filter filter) {
-    this.filter = filter;
+    super.setFilter(filter);
     return this;
   }
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Nov 26 23:33:31 2013
@@ -1813,10 +1813,14 @@ public final class ProtobufUtil {
         AccessControlProtos.NamespacePermission.Builder builder =
             AccessControlProtos.NamespacePermission.newBuilder();
         builder.setNamespaceName(ByteString.copyFromUtf8(tablePerm.getNamespace()));
-        for (Permission.Action a : perm.getActions()) {
-          builder.addAction(toPermissionAction(a));
-        }
+        Permission.Action actions[] = perm.getActions();
+        if (actions != null) {
+          for (Permission.Action a : actions) {
+            builder.addAction(toPermissionAction(a));
+          }
+	}
         ret.setNamespacePermission(builder);
+        return ret.build();
       } else if (tablePerm.hasTable()) {
         ret.setType(AccessControlProtos.Permission.Type.Table);
 
@@ -1829,21 +1833,28 @@ public final class ProtobufUtil {
         if (tablePerm.hasQualifier()) {
           builder.setQualifier(ZeroCopyLiteralByteString.wrap(tablePerm.getQualifier()));
         }
-        for (Permission.Action a : perm.getActions()) {
-          builder.addAction(toPermissionAction(a));
+        Permission.Action actions[] = perm.getActions();
+        if (actions != null) {
+          for (Permission.Action a : actions) {
+            builder.addAction(toPermissionAction(a));
+          }
         }
         ret.setTablePermission(builder);
+        return ret.build();
       }
-    } else {
-      ret.setType(AccessControlProtos.Permission.Type.Global);
+    }
+
+    ret.setType(AccessControlProtos.Permission.Type.Global);
 
-      AccessControlProtos.GlobalPermission.Builder builder =
-          AccessControlProtos.GlobalPermission.newBuilder();
-      for (Permission.Action a : perm.getActions()) {
+    AccessControlProtos.GlobalPermission.Builder builder =
+        AccessControlProtos.GlobalPermission.newBuilder();
+    Permission.Action actions[] = perm.getActions();
+    if (actions != null) {
+      for (Permission.Action a: actions) {
         builder.addAction(toPermissionAction(a));
       }
-      ret.setGlobalPermission(builder);
     }
+    ret.setGlobalPermission(builder);
     return ret.build();
   }
 
@@ -2552,4 +2563,43 @@ public final class ProtobufUtil {
     }
     return builder.build();
   }
+
+  public static AccessControlProtos.UsersAndPermissions toUsersAndPermissions(String user,
+      Permission perms) {
+    return AccessControlProtos.UsersAndPermissions.newBuilder()
+      .addUserPermissions(AccessControlProtos.UsersAndPermissions.UserPermissions.newBuilder()
+        .setUser(ByteString.copyFromUtf8(user))
+        .addPermissions(toPermission(perms))
+        .build())
+      .build();
+  }
+
+  public static AccessControlProtos.UsersAndPermissions toUsersAndPermissions(
+      ListMultimap<String, Permission> perms) {
+    AccessControlProtos.UsersAndPermissions.Builder builder =
+        AccessControlProtos.UsersAndPermissions.newBuilder();
+    for (Map.Entry<String, Collection<Permission>> entry : perms.asMap().entrySet()) {
+      AccessControlProtos.UsersAndPermissions.UserPermissions.Builder userPermBuilder =
+        AccessControlProtos.UsersAndPermissions.UserPermissions.newBuilder();
+      userPermBuilder.setUser(ByteString.copyFromUtf8(entry.getKey()));
+      for (Permission perm: entry.getValue()) {
+        userPermBuilder.addPermissions(toPermission(perm));
+      }
+      builder.addUserPermissions(userPermBuilder.build());
+    }
+    return builder.build();
+  }
+
+  public static ListMultimap<String, Permission> toUsersAndPermissions(
+      AccessControlProtos.UsersAndPermissions proto) {
+    ListMultimap<String, Permission> result = ArrayListMultimap.create();
+    for (AccessControlProtos.UsersAndPermissions.UserPermissions userPerms:
+        proto.getUserPermissionsList()) {
+      String user = userPerms.getUser().toStringUtf8();
+      for (AccessControlProtos.Permission perm: userPerms.getPermissionsList()) {
+        result.put(user, toPermission(perm));
+      }
+    }
+    return result;
+  }
 }

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlConstants.java?rev=1545882&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlConstants.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlConstants.java Tue Nov 26 23:33:31 2013
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface AccessControlConstants {
+
+  // Operation attributes for cell level security
+
+  /** Cell level ACL */
+  public static final String OP_ATTRIBUTE_ACL = "acl";
+  /** Cell level ACL evaluation strategy */
+  public static final String OP_ATTRIBUTE_ACL_STRATEGY = "acl.strategy";
+  /** Default cell ACL evaluation strategy: Table and CF first, then ACL */
+  public static final byte[] OP_ATTRIBUTE_ACL_STRATEGY_DEFAULT = new byte[] { 0 };
+  /** Alternate cell ACL evaluation strategy: Cell ACL first, then table and CF */
+  public static final byte[] OP_ATTRIBUTE_ACL_STRATEGY_CELL_FIRST = new byte[] { 1 };
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java Tue Nov 26 23:33:31 2013
@@ -19,10 +19,8 @@
 package org.apache.hadoop.hbase.security.access;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.security.User;
 
@@ -46,6 +44,8 @@ class AccessControlFilter extends Filter
   private TableAuthManager authManager;
   private TableName table;
   private User user;
+  private boolean isSystemTable;
+  private boolean cellFirstStrategy;
 
   /**
    * For Writable
@@ -53,21 +53,27 @@ class AccessControlFilter extends Filter
   AccessControlFilter() {
   }
 
-  AccessControlFilter(TableAuthManager mgr, User ugi,
-      TableName tableName) {
+  AccessControlFilter(TableAuthManager mgr, User ugi, TableName tableName,
+      boolean cellFirstStrategy) {
     authManager = mgr;
     table = tableName;
     user = ugi;
+    isSystemTable = tableName.isSystemTable();
+    this.cellFirstStrategy = cellFirstStrategy;
   }
 
   @Override
-  public ReturnCode filterKeyValue(Cell c) {
-    // TODO go and redo auth manager to use Cell instead of KV.
-    KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-    if (authManager.authorize(user, table, kv, TablePermission.Action.READ)) {
+  public ReturnCode filterKeyValue(Cell cell) {
+    if (isSystemTable) {
       return ReturnCode.INCLUDE;
     }
-    return ReturnCode.NEXT_COL;
+    if (authManager.authorize(user, table, cell, cellFirstStrategy, Permission.Action.READ)) {
+      return ReturnCode.INCLUDE;
+    }
+    // Before per cell ACLs we used to return the NEXT_COL hint, but we can
+    // no longer do that since, given the possibility of per cell ACLs
+    // anywhere, we now need to examine all KVs with this filter.
+    return ReturnCode.SKIP;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java Tue Nov 26 23:33:31 2013
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,9 +39,9 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -60,12 +61,14 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Text;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -99,6 +102,8 @@ public class AccessControlLists {
   /** Column family used to store ACL grants */
   public static final String ACL_LIST_FAMILY_STR = "l";
   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
+  /** KV tag to store per cell access control lists */
+  public static final byte ACL_TAG_TYPE = (byte) 1;
 
   public static final char NAMESPACE_PREFIX = '@';
 
@@ -675,4 +680,36 @@ public class AccessControlLists {
      }
      return Arrays.copyOfRange(namespace, 1, namespace.length);
    }
+
+   public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
+       throws IOException {
+     List<Permission> results = Lists.newArrayList();
+     byte[] tags = CellUtil.getTagArray(cell);
+     Iterator<Tag> tagsIterator = CellUtil.tagsIterator(tags, 0, tags.length);
+     while (tagsIterator.hasNext()) {
+       Tag tag = tagsIterator.next();
+       if (tag.getType() == ACL_TAG_TYPE) {
+         // Deserialize the table permissions from the KV
+         ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
+           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
+             tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
+         // Are there permissions for this user?
+         List<Permission> userPerms = kvPerms.get(user.getShortName());
+         if (userPerms != null) {
+           results.addAll(userPerms);
+         }
+         // Are there permissions for any of the groups this user belongs to?
+         String groupNames[] = user.getGroupNames();
+         if (groupNames != null) {
+           for (String group : groupNames) {
+             List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
+             if (results != null) {
+               results.addAll(groupPerms);
+             }
+           }
+         }
+       }
+     }
+     return results;
+   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Tue Nov 26 23:33:31 2013
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,12 +29,14 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -44,16 +47,20 @@ import org.apache.hadoop.hbase.Namespace
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.coprocessor.*;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.ipc.RequestContext;
@@ -78,11 +85,11 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.util.Pair;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
@@ -358,35 +365,6 @@ public class AccessController extends Ba
   }
 
   /**
-   * Authorizes that the current user has any of the given permissions for the
-   * given table, column family and column qualifier.
-   * @param namespace
-   * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if user has no authorization
-   */
-  private void requirePermission(String request, String namespace,
-      Action... permissions) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.authorize(user, namespace, permission)) {
-        result = AuthResult.allow(request, "Table permission granted", user,
-                                  permission, namespace);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-                                 permission, namespace);
-      }
-    }
-    logResult(result);
-    if (!result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
-    }
-  }
-
-  /**
    * Authorizes that the current user has global privileges for the given action.
    * @param perm The action being requested
    * @throws IOException if obtaining the current user fails
@@ -462,48 +440,186 @@ public class AccessController extends Ba
     }
   }
 
-  /**
-   * Returns <code>true</code> if the current user is allowed the given action
-   * over at least one of the column qualifiers in the given column families.
-   */
-  private boolean hasFamilyQualifierPermission(User user,
-      Permission.Action perm,
-      RegionCoprocessorEnvironment env,
-      Map<byte[], ? extends Set<byte[]>> familyMap)
-    throws IOException {
-    HRegionInfo hri = env.getRegion().getRegionInfo();
-    TableName tableName = hri.getTable();
+  private void requireCoveringPermission(String request, RegionCoprocessorEnvironment e,
+      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long timestamp,
+      boolean allVersions, Action...actions) throws IOException {
+    User user = getActiveUser();
 
-    if (user == null) {
-      return false;
+    // First check table or CF level permissions, if they grant access we can
+    // early out before needing to enumerate over per KV perms.
+
+    List<Action> cellCheckActions = Lists.newArrayList();
+    // TODO: permissionGranted should support checking multiple actions or
+    // we should convert actions into a bitmap and pass that around. See
+    // HBASE-7123.
+    AuthResult results[] = new AuthResult[actions.length];
+    for (int i = 0; i < actions.length; i++) {
+      results[i] = permissionGranted(request, user, actions[i], e, familyMap);
+      if (!results[i].isAllowed()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Got " + results[i] + ", added to cellCheckActions");
+        }
+        cellCheckActions.add(actions[i]);
+      }
+    }
+    // If all permissions checks passed, we can early out
+    if (cellCheckActions.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("All permissions checks passed, we can early out");
+      }
+      for (int i = 0; i < results.length; i++) {
+        logResult(results[i]);
+      }
+      return;
     }
 
-    if (familyMap != null && familyMap.size() > 0) {
-      // at least one family must be allowed
-      for (Map.Entry<byte[], ? extends Set<byte[]>> family :
-          familyMap.entrySet()) {
-        if (family.getValue() != null && !family.getValue().isEmpty()) {
-          for (byte[] qualifier : family.getValue()) {
-            if (authManager.matchPermission(user, tableName,
-                family.getKey(), qualifier, perm)) {
-              return true;
-            }
+    // Table or CF permissions do not allow, enumerate the covered KVs. We
+    // can stop at the first which does not grant access.
+
+    Get get = new Get(row);
+    if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp);
+    get.setMaxResultsPerColumnFamily(1); // Hold down memory use on wide rows
+    if (allVersions) {
+      get.setMaxVersions();
+    } else {
+      get.setMaxVersions(1);
+    }
+    for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
+      byte[] col = entry.getKey();
+      // TODO: HBASE-7114 could possibly unify the collection type in family
+      // maps so we would not need to do this
+      if (entry.getValue() instanceof Set) {
+        Set<byte[]> set = (Set<byte[]>)entry.getValue();
+        if (set == null || set.isEmpty()) {
+          get.addFamily(col);
+        } else {
+          for (byte[] qual: set) {
+            get.addColumn(col, qual);
           }
+        }
+      } else if (entry.getValue() instanceof List) {
+        List<Cell> list = (List<Cell>)entry.getValue();
+        if (list == null || list.isEmpty()) {
+          get.addFamily(col);
         } else {
-          if (authManager.matchPermission(user, tableName, family.getKey(),
-              perm)) {
-            return true;
+          for (Cell cell: list) {
+            get.addColumn(col, CellUtil.cloneQualifier(cell));
           }
         }
+      } else {
+        throw new RuntimeException("Unhandled collection type " +
+          entry.getValue().getClass().getName());
       }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("Empty family map passed for permission check");
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Scanning for cells with " + get);
+    }
+    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
+    List<Cell> cells = Lists.newArrayList();
+    int numCells = 0;
+    try {
+      boolean more = false;
+      do {
+        cells.clear();
+        more = scanner.next(cells);
+        for (Cell cell: cells) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Found cell " + cell);
+          }
+          for (Action action: cellCheckActions) {
+            // Are there permissions for this user for the cell?
+            if (!authManager.authorize(user, getTableName(e), cell, false, action)) {
+              AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
+                user, action, getTableName(e), CellUtil.cloneFamily(cell),
+                CellUtil.cloneQualifier(cell));
+              logResult(authResult);
+              throw new AccessDeniedException("Insufficient permissions " +
+                  authResult.toContextString());
+            }
+          }
+          numCells++;
+        }
+      } while (more);
+    } catch (AccessDeniedException ex) {
+      throw ex;
+    } catch (IOException ex) {
+      LOG.error("Exception while getting cells to calculate covering permission", ex);
+    } finally {
+      scanner.close();
+    }
+
+    // If there were no cells to check, throw the ADE
+    if (numCells < 1) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("No cells found with scan");
+      }
+      AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
+        user, cellCheckActions.get(0), getTableName(e), familyMap);
+      logResult(authResult);
+      throw new AccessDeniedException("Insufficient permissions " +
+        authResult.toContextString());
+    }
+
+    // Log that authentication succeeded. We need to trade off logging maybe
+    // thousands of fine grained decisions with providing detail.
+    for (byte[] family: get.getFamilyMap().keySet()) {
+      for (Action action: actions) {
+        logResult(AuthResult.allow(request, "Permission granted", user, action,
+          getTableName(e), family, null));
+      }
+    }
+  }
+
+  private void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
+    // Iterate over the entries in the familyMap, replacing the cells therein
+    // with new cells including the ACL data
+    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+      List<Cell> newCells = Lists.newArrayList();
+      for (Cell cell: e.getValue()) {
+        List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
+        byte[] tagBytes = CellUtil.getTagArray(cell);
+        Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
+        while (tagIterator.hasNext()) {
+          tags.add(tagIterator.next());
+        }
+        // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
+        // incoming cell type is actually KeyValue.
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+        byte[] bytes = kv.getBuffer();
+        newCells.add(
+          new KeyValue(bytes, kv.getRowOffset(), kv.getRowLength(),
+            bytes, kv.getFamilyOffset(), kv.getFamilyLength(),
+            bytes, kv.getQualifierOffset(), kv.getQualifierLength(),
+            kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
+            bytes, kv.getValueOffset(), kv.getValueLength(),
+            tags));
+      }
+      // This is supposed to be safe, won't CME
+      e.setValue(newCells);
+    }
+  }
 
-    return false;
+  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Query query) throws IOException {
+    TableName tableName = getTableName(c.getEnvironment());
+    User activeUser = getActiveUser();
+    Filter filter = query.getFilter();
+    boolean cellFirstStrategy = query.getACLStrategy();
+    // Don't wrap an AccessControlFilter
+    if (filter != null && filter instanceof AccessControlFilter) {
+      return;
+    }
+    Filter newFilter = (filter != null)
+      ? new FilterList(FilterList.Operator.MUST_PASS_ALL,
+          Lists.newArrayList(
+            new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy),
+            filter))
+      : new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy);
+    query.setFilter(newFilter);
   }
 
   /* ---- MasterObserver implementation ---- */
+
   public void start(CoprocessorEnvironment env) throws IOException {
 
     ZooKeeperWatcher zk = null;
@@ -581,11 +697,13 @@ public class AccessController extends Ba
   @Override
   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
+
   @Override
   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {
     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
   }
+
   @Override
   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
@@ -625,9 +743,11 @@ public class AccessController extends Ba
   @Override
   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor column) throws IOException {}
+
   @Override
   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor column) throws IOException {}
+
   @Override
   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor column) throws IOException {}
@@ -641,14 +761,15 @@ public class AccessController extends Ba
   @Override
   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
+
   @Override
   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
+
   @Override
   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
 
-
   @Override
   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
       byte[] col) throws IOException {
@@ -658,12 +779,14 @@ public class AccessController extends Ba
   @Override
   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, byte[] col) throws IOException {}
+
   @Override
   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, byte[] col) throws IOException {
     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(),
                                               tableName, col);
   }
+
   @Override
   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName, byte[] col) throws IOException {}
@@ -677,9 +800,11 @@ public class AccessController extends Ba
   @Override
   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
+
   @Override
   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
+
   @Override
   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
@@ -697,9 +822,11 @@ public class AccessController extends Ba
   @Override
   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
+
   @Override
   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
+
   @Override
   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {}
@@ -761,6 +888,7 @@ public class AccessController extends Ba
     requirePermission("balanceSwitch", Permission.Action.ADMIN);
     return newValue;
   }
+
   @Override
   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
       boolean oldValue, boolean newValue) throws IOException {}
@@ -949,55 +1077,20 @@ public class AccessController extends Ba
       final byte [] row, final byte [] family, final Result result)
       throws IOException {
     assert family != null;
-    //noinspection PrimitiveArrayArgumentToVariableArgMethod
-    requirePermission("getClosestRowBefore", Permission.Action.READ, c.getEnvironment(),
-        makeFamilyMap(family, null));
+    requireCoveringPermission("getClosestRowBefore", c.getEnvironment(), row,
+      makeFamilyMap(family, null), HConstants.LATEST_TIMESTAMP, false, Permission.Action.READ);
   }
 
   @Override
   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Get get, final List<Cell> result) throws IOException {
-    /*
-     if column family level checks fail, check for a qualifier level permission
-     in one of the families.  If it is present, then continue with the AccessControlFilter.
-      */
-    RegionCoprocessorEnvironment e = c.getEnvironment();
-    User requestUser = getActiveUser();
-    AuthResult authResult = permissionGranted("get", requestUser,
-        Permission.Action.READ, e, get.getFamilyMap());
-    if (!authResult.isAllowed()) {
-      if (hasFamilyQualifierPermission(requestUser,
-          Permission.Action.READ, e, get.getFamilyMap())) {
-        TableName table = getTableName(e);
-        AccessControlFilter filter = new AccessControlFilter(authManager,
-            requestUser, table);
-
-        // wrap any existing filter
-        if (get.getFilter() != null) {
-          FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
-              Lists.newArrayList(filter, get.getFilter()));
-          get.setFilter(wrapper);
-        } else {
-          get.setFilter(filter);
-        }
-        logResult(AuthResult.allow("get", "Access allowed with filter", requestUser,
-            Permission.Action.READ, authResult.getTableName(), get.getFamilyMap()));
-      } else {
-        logResult(authResult);
-        throw new AccessDeniedException("Insufficient permissions (table=" +
-          e.getRegion().getTableDesc().getTableName() + ", action=READ)");
-      }
-    } else {
-      // log auth success
-      logResult(authResult);
-    }
+    internalPreRead(c, get);
   }
 
   @Override
   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Get get, final boolean exists) throws IOException {
-    requirePermission("exists", Permission.Action.READ, c.getEnvironment(),
-        get.getFamilyMap());
+    internalPreRead(c, get);
     return exists;
   }
 
@@ -1005,8 +1098,18 @@ public class AccessController extends Ba
   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Put put, final WALEdit edit, final Durability durability)
       throws IOException {
-    requirePermission("put", Permission.Action.WRITE, c.getEnvironment(),
-        put.getFamilyCellMap());
+    // Require WRITE permission to the table, CF, or top visible value, if any.
+    // NOTE: We don't need to check the permissions for any earlier Puts
+    // because we treat the ACLs in each Put as timestamped like any other
+    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
+    // change the ACL of any previous Put. This allows simple evolution of
+    // security policy over time without requiring expensive updates.
+    requireCoveringPermission("put", c.getEnvironment(), put.getRow(),
+      put.getFamilyCellMap(), put.getTimeStamp(), false, Permission.Action.WRITE);
+    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+    if (bytes != null) {
+      addCellPermissions(bytes, put.getFamilyCellMap());
+    }
   }
 
   @Override
@@ -1021,8 +1124,17 @@ public class AccessController extends Ba
   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Delete delete, final WALEdit edit, final Durability durability)
       throws IOException {
-    requirePermission("delete", Permission.Action.WRITE, c.getEnvironment(),
-        delete.getFamilyCellMap());
+    // An ACL on a delete is useless, we shouldn't allow it
+    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
+      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
+    }
+    // Require WRITE permissions on all cells covered by the delete. Unlike
+    // for Puts we need to check all visible prior versions, because a major
+    // compaction could remove them. If the user doesn't have permission to
+    // overwrite any of the visible versions ('visible' defined as not covered
+    // by a tombstone already) then we have to disallow this operation.
+    requireCoveringPermission("delete", c.getEnvironment(), delete.getRow(),
+      delete.getFamilyCellMap(), delete.getTimeStamp(), true, Action.WRITE);
   }
 
   @Override
@@ -1040,9 +1152,14 @@ public class AccessController extends Ba
       final CompareFilter.CompareOp compareOp,
       final ByteArrayComparable comparator, final Put put,
       final boolean result) throws IOException {
-    Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
-    requirePermission("checkAndPut", Permission.Action.READ, c.getEnvironment(), familyMap);
-    requirePermission("checkAndPut", Permission.Action.WRITE, c.getEnvironment(), familyMap);
+    // Require READ and WRITE permissions on the table, CF, and KV to update
+    requireCoveringPermission("checkAndPut", c.getEnvironment(), row,
+      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
+      Action.READ, Action.WRITE);
+    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+    if (bytes != null) {
+      addCellPermissions(bytes, put.getFamilyCellMap());
+    }
     return result;
   }
 
@@ -1052,9 +1169,16 @@ public class AccessController extends Ba
       final CompareFilter.CompareOp compareOp,
       final ByteArrayComparable comparator, final Delete delete,
       final boolean result) throws IOException {
-    Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
-    requirePermission("checkAndDelete", Permission.Action.READ, c.getEnvironment(), familyMap);
-    requirePermission("checkAndDelete", Permission.Action.WRITE, c.getEnvironment(), familyMap);
+    // An ACL on a delete is useless, we shouldn't allow it
+    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
+      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
+          delete.toString());
+    }
+    // Require READ and WRITE permissions on the table, CF, and the KV covered
+    // by the delete
+    requireCoveringPermission("checkAndDelete", c.getEnvironment(), row,
+      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
+      Action.READ, Action.WRITE);
     return result;
   }
 
@@ -1063,15 +1187,25 @@ public class AccessController extends Ba
       final byte [] row, final byte [] family, final byte [] qualifier,
       final long amount, final boolean writeToWAL)
       throws IOException {
-    Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
-    requirePermission("incrementColumnValue", Permission.Action.WRITE, c.getEnvironment(), familyMap);
+    // Require WRITE permission to the table, CF, and the KV to be replaced by the
+    // incremented value
+    requireCoveringPermission("incrementColumnValue", c.getEnvironment(), row,
+      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
+      Action.WRITE);
     return -1;
   }
 
   @Override
   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
       throws IOException {
-    requirePermission("append", Permission.Action.WRITE, c.getEnvironment(), append.getFamilyCellMap());
+    // Require WRITE permission to the table, CF, and the KV to be appended
+    requireCoveringPermission("append", c.getEnvironment(), append.getRow(),
+      append.getFamilyCellMap(), append.getTimeStamp(), false,
+      Action.WRITE);
+    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+    if (bytes != null) {
+      addCellPermissions(bytes, append.getFamilyCellMap());
+    }
     return null;
   }
 
@@ -1079,59 +1213,87 @@ public class AccessController extends Ba
   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Increment increment)
       throws IOException {
-    // Create a map of family to qualifiers.
-    Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
-      Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
-      for (Cell cell: entry.getValue()) {
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-        qualifiers.add(kv.getQualifier());
-      }
-      familyMap.put(entry.getKey(), qualifiers);
+    // Require WRITE permission to the table, CF, and the KV to be replaced by
+    // the incremented value
+    requireCoveringPermission("increment", c.getEnvironment(), increment.getRow(),
+      increment.getFamilyCellMap(), increment.getTimeRange().getMax(), false,
+      Action.WRITE);
+    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
+    if (bytes != null) {
+      addCellPermissions(bytes, increment.getFamilyCellMap());
     }
-    requirePermission("increment", Permission.Action.WRITE, c.getEnvironment(), familyMap);
     return null;
   }
 
   @Override
-  public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final Scan scan, final RegionScanner s) throws IOException {
-    /*
-     if column family level checks fail, check for a qualifier level permission
-     in one of the families.  If it is present, then continue with the AccessControlFilter.
-      */
-    RegionCoprocessorEnvironment e = c.getEnvironment();
-    User user = getActiveUser();
-    AuthResult authResult = permissionGranted("scannerOpen", user, Permission.Action.READ, e,
-        scan.getFamilyMap());
-    if (!authResult.isAllowed()) {
-      if (hasFamilyQualifierPermission(user, Permission.Action.READ, e,
-          scan.getFamilyMap())) {
-        TableName table = getTableName(e);
-        AccessControlFilter filter = new AccessControlFilter(authManager,
-            user, table);
-
-        // wrap any existing filter
-        if (scan.hasFilter()) {
-          FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
-              Lists.newArrayList(filter, scan.getFilter()));
-          scan.setFilter(wrapper);
+  public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
+      MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+
+    List<Tag> tags = Lists.newArrayList();
+    ListMultimap<String,Permission> perms = ArrayListMultimap.create();
+    if (oldCell != null) {
+      byte[] tagBytes = CellUtil.getTagArray(oldCell);
+      Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
+      while (tagIterator.hasNext()) {
+        Tag tag = tagIterator.next();
+        if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
+              " length " + tag.getValue().length);
+          }
+          tags.add(tag);
         } else {
-          scan.setFilter(filter);
+          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
+            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
+              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
+          perms.putAll(kvPerms);
         }
-        logResult(AuthResult.allow("scannerOpen", "Access allowed with filter", user,
-            Permission.Action.READ, authResult.getTableName(), scan.getFamilyMap()));
-      } else {
-        // no table/family level perms and no qualifier level perms, reject
-        logResult(authResult);
-        throw new AccessDeniedException("Insufficient permissions for user '"+
-            (user != null ? user.getShortName() : "null")+"' "+
-            "for scanner open on table " + getTableName(e));
       }
+    }
+
+    // Do we have an ACL on the operation?
+    byte[] aclBytes = mutation.getACL();
+    if (aclBytes != null) {
+      // Yes, use it
+      tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
     } else {
-      // log success
-      logResult(authResult);
+      // No, use what we carried forward
+      if (perms != null) {
+        // TODO: If we collected ACLs from more than one tag we may have a
+        // List<Permission> of size > 1, this can be collapsed into a single
+        // Permission
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
+        }
+        tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
+          ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
+      }
     }
+
+    // If we have no tags to add, just return
+    if (tags.isEmpty()) {
+      return newCell;
+    }
+
+    // We need to create another KV, unfortunately, because the current new KV
+    // has no space for tags
+    KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
+    byte[] bytes = newKv.getBuffer();
+    KeyValue rewriteKv = new KeyValue(bytes, newKv.getRowOffset(), newKv.getRowLength(),
+      bytes, newKv.getFamilyOffset(), newKv.getFamilyLength(),
+      bytes, newKv.getQualifierOffset(), newKv.getQualifierLength(),
+      newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
+      bytes, newKv.getValueOffset(), newKv.getValueLength(),
+      tags);
+    // Preserve mvcc data
+    rewriteKv.setMvccVersion(newKv.getMvccVersion());
+    return rewriteKv;
+  }
+
+  @Override
+  public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Scan scan, final RegionScanner s) throws IOException {
+    internalPreRead(c, scan);
     return s;
   }
 
@@ -1423,6 +1585,10 @@ public class AccessController extends Ba
     return AccessControlProtos.AccessControlService.newReflectiveService(this);
   }
 
+  private HRegion getRegion(RegionCoprocessorEnvironment e) {
+    return e.getRegion();
+  }
+
   private TableName getTableName(RegionCoprocessorEnvironment e) {
     HRegion region = e.getRegion();
     TableName tableName = null;
@@ -1519,4 +1685,5 @@ public class AccessController extends Ba
   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
       List<HTableDescriptor> descriptors) throws IOException {
   }
+
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java Tue Nov 26 23:33:31 2013
@@ -24,9 +24,10 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -342,44 +343,67 @@ public class TableAuthManager {
     return false;
   }
 
-  public boolean authorize(User user, TableName table, KeyValue kv,
-      Permission.Action action) {
-    PermissionCache<TablePermission> tablePerms = tableCache.get(table);
-    if (tablePerms != null) {
-      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
-      if (authorize(userPerms, table, kv, action)) {
-        return true;
+  private boolean checkCellPermissions(User user, Cell cell, Permission.Action action) {
+    try {
+      List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found perms for user " + user.getShortName() + " in cell " +
+          cell + ": " + perms);
       }
-
-      String[] groupNames = user.getGroupNames();
-      if (groupNames != null) {
-        for (String group : groupNames) {
-          List<TablePermission> groupPerms = tablePerms.getGroup(group);
-          if (authorize(groupPerms, table, kv, action)) {
-            return true;
-          }
+      for (Permission p: perms) {
+        if (p.implies(action)) {
+          return true;
         }
       }
+    } catch (IOException e) {
+      // We failed to parse the KV tag
+      LOG.error("Failed parse of ACL tag in cell " + cell);
+      // Fall through to check with the table and CF perms we were able
+      // to collect regardless
     }
     return false;
   }
 
-  private boolean authorize(List<TablePermission> perms, TableName table, KeyValue kv,
+  private boolean checkTableColumnPermissions(User user, TableName table, Cell cell,
       Permission.Action action) {
-    if (perms != null) {
-      for (TablePermission p : perms) {
-        if (p.implies(table, kv, action)) {
+    // TODO: Do not clone here
+    byte[] family = CellUtil.cloneFamily(cell);
+    byte[] qualifier = CellUtil.cloneQualifier(cell);
+    // User is authorized at table or CF level
+    if (authorizeUser(user.getShortName(), table, family, qualifier, action)) {
+      return true;
+    }
+    String groupNames[] = user.getGroupNames();
+    if (groupNames != null) {
+      for (String group: groupNames) {
+        // TODO: authorizeGroup should check qualifier too?
+        // Group is authorized at table or CF level
+        if (authorizeGroup(group, table, family, action)) {
           return true;
         }
       }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("No permissions for authorize() check, table=" +
-          table);
     }
-
     return false;
   }
 
+  /**
+   * Authorize a user for a given KV. This is called from AccessControlFilter.
+   */
+  public boolean authorize(User user, TableName table, Cell cell, boolean cellFirstStrategy,
+      Permission.Action action) {
+    if (cellFirstStrategy) {
+      if (checkCellPermissions(user, cell, action)) {
+        return true;
+      }
+      return checkTableColumnPermissions(user, table, cell, action);
+    } else {
+      if (checkTableColumnPermissions(user, table, cell, action)) {
+        return true;
+      }
+      return checkCellPermissions(user, cell, action);
+    }
+  }
+
   public boolean authorize(User user, String namespace, Permission.Action action) {
     if (authorizeUser(user.getShortName(), action)) {
       return true;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java Tue Nov 26 23:33:31 2013
@@ -50,6 +50,8 @@ public class SecureTestUtil {
     // add the process running user to superusers
     String currentUser = User.getCurrent().getName();
     conf.set("hbase.superuser", "admin,"+currentUser);
+    // Need HFile V3 for tags for security features
+    conf.setInt("hfile.format.version", 3);
   }
   
   public void verifyAllowed(User user, PrivilegedExceptionAction... actions) throws Exception {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java?rev=1545882&r1=1545881&r2=1545882&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java Tue Nov 26 23:33:31 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.security
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -29,8 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.LargeTests;
@@ -42,7 +39,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
@@ -58,7 +54,6 @@ import com.google.protobuf.BlockingRpcCh
 @Category(LargeTests.class)
 public class TestAccessControlFilter {
   @Rule public TestName name = new TestName();
-  private static Log LOG = LogFactory.getLog(TestAccessControlFilter.class);
   private static HBaseTestingUtility TEST_UTIL;
 
   private static User ADMIN;
@@ -116,15 +111,19 @@ public class TestAccessControlFilter {
       public Object run() throws Exception {
         HTable aclmeta = new HTable(TEST_UTIL.getConfiguration(),
             AccessControlLists.ACL_TABLE_NAME);
-        byte[] table = Bytes.toBytes(name.getMethodName());
-        BlockingRpcChannel service = aclmeta.coprocessorService(table);
-        AccessControlService.BlockingInterface protocol =
-          AccessControlService.newBlockingStub(service);
-        ProtobufUtil.grant(protocol, READER.getShortName(),
-          TABLE, null, null, Permission.Action.READ);
-        ProtobufUtil.grant(protocol, LIMITED.getShortName(),
-          TABLE, FAMILY, PUBLIC_COL, Permission.Action.READ);
-        return null;
+        try {
+          byte[] table = Bytes.toBytes(name.getMethodName());
+          BlockingRpcChannel service = aclmeta.coprocessorService(table);
+          AccessControlService.BlockingInterface protocol =
+            AccessControlService.newBlockingStub(service);
+          ProtobufUtil.grant(protocol, READER.getShortName(),
+            TABLE, null, null, Permission.Action.READ);
+          ProtobufUtil.grant(protocol, LIMITED.getShortName(),
+            TABLE, FAMILY, PUBLIC_COL, Permission.Action.READ);
+          return null;
+        } finally {
+          aclmeta.close();
+        }
       }
     });
 
@@ -145,18 +144,22 @@ public class TestAccessControlFilter {
         // force a new RS connection
         conf.set("testkey", UUID.randomUUID().toString());
         HTable t = new HTable(conf, TABLE);
-        ResultScanner rs = t.getScanner(new Scan());
-        int rowcnt = 0;
-        for (Result r : rs) {
-          rowcnt++;
-          int rownum = Bytes.toInt(r.getRow());
-          assertTrue(r.containsColumn(FAMILY, PRIVATE_COL));
-          assertEquals("secret "+rownum, Bytes.toString(r.getValue(FAMILY, PRIVATE_COL)));
-          assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
-          assertEquals("info "+rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+        try {
+          ResultScanner rs = t.getScanner(new Scan());
+          int rowcnt = 0;
+          for (Result r : rs) {
+            rowcnt++;
+            int rownum = Bytes.toInt(r.getRow());
+            assertTrue(r.containsColumn(FAMILY, PRIVATE_COL));
+            assertEquals("secret "+rownum, Bytes.toString(r.getValue(FAMILY, PRIVATE_COL)));
+            assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+            assertEquals("info "+rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+          }
+          assertEquals("Expected 100 rows returned", 100, rowcnt);
+          return null;
+        } finally {
+          t.close();
         }
-        assertEquals("Expected 100 rows returned", 100, rowcnt);
-        return null;
       }
     });
 
@@ -167,34 +170,46 @@ public class TestAccessControlFilter {
         // force a new RS connection
         conf.set("testkey", UUID.randomUUID().toString());
         HTable t = new HTable(conf, TABLE);
-        ResultScanner rs = t.getScanner(new Scan());
-        int rowcnt = 0;
-        for (Result r : rs) {
-          rowcnt++;
-          int rownum = Bytes.toInt(r.getRow());
-          assertFalse(r.containsColumn(FAMILY, PRIVATE_COL));
-          assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
-          assertEquals("info " + rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+        try {
+          ResultScanner rs = t.getScanner(new Scan());
+          int rowcnt = 0;
+          for (Result r : rs) {
+            rowcnt++;
+            int rownum = Bytes.toInt(r.getRow());
+            assertFalse(r.containsColumn(FAMILY, PRIVATE_COL));
+            assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+            assertEquals("info " + rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+          }
+          assertEquals("Expected 100 rows returned", 100, rowcnt);
+          return null;
+        } finally {
+          t.close();
         }
-        assertEquals("Expected 100 rows returned", 100, rowcnt);
-        return null;
       }
     });
 
     // test as user with no permission
-    DENIED.runAs(new PrivilegedExceptionAction(){
+    DENIED.runAs(new PrivilegedExceptionAction<Object>(){
       public Object run() throws Exception {
+        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+        // force a new RS connection
+        conf.set("testkey", UUID.randomUUID().toString());
+        HTable t = new HTable(conf, TABLE);
         try {
-          Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
-          // force a new RS connection
-          conf.set("testkey", UUID.randomUUID().toString());
-          HTable t = new HTable(conf, TABLE);
           ResultScanner rs = t.getScanner(new Scan());
-          fail("Attempt to open scanner should have been denied");
-        } catch (AccessDeniedException ade) {
-          // expected
+          int rowcnt = 0;
+          for (Result r : rs) {
+            rowcnt++;
+            int rownum = Bytes.toInt(r.getRow());
+            assertFalse(r.containsColumn(FAMILY, PRIVATE_COL));
+            assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+            assertEquals("info " + rownum, Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+          }
+          assertEquals("Expected 0 rows returned", 0, rowcnt);
+          return null;
+        } finally {
+          t.close();
         }
-        return null;
       }
     });
   }



Mime
View raw message