hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r945902 - in /hbase/trunk: ./ core/src/main/java/org/apache/hadoop/hbase/client/ core/src/main/java/org/apache/hadoop/hbase/filter/ core/src/main/java/org/apache/hadoop/hbase/io/ core/src/main/java/org/apache/hadoop/hbase/regionserver/ core...
Date Tue, 18 May 2010 21:37:46 GMT
Author: rawson
Date: Tue May 18 21:37:46 2010
New Revision: 945902

URL: http://svn.apache.org/viewvc?rev=945902&view=rev
Log:
HBASE-2466  Improving filter API to allow for modification of keyvalue list by filter

Added:
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java
    hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May 18 21:37:46 2010
@@ -602,6 +602,8 @@ Release 0.21.0 - Unreleased
    HBASE-2520  Cleanup arrays vs Lists of scanners (Todd Lipcon via Stack)
    HBASE-2551  Forward port fixes that are in branch but not in trunk (part
                of the merge of old 0.20 into TRUNK task)
+   HBASE-2466  Improving filter API to allow for modification of keyvalue list 
+   	       by filter (Juhani Connolly via Ryan)
 
 
   NEW FEATURES

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java Tue May 18 21:37:46 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -280,6 +281,11 @@ public class Scan implements Writable {
    * @param batch the maximum number of values
    */
   public void setBatch(int batch) {
+	if(this.hasFilter() && this.filter.hasFilterRow()) {
+	  throw new IncompatibleFilterException(
+        "Cannot set batch on a scan using a filter" +
+        " that returns true for filter.hasFilterRow");
+	}
     this.batch = batch;
   }
 

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java Tue May 18 21:37:46 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -32,7 +32,7 @@ import java.io.IOException;
  * its quota of columns, {@link #filterAllRemaining()} returns true.  This
  * makes this filter unsuitable as a Scan filter.
  */
-public class ColumnCountGetFilter implements Filter {
+public class ColumnCountGetFilter extends FilterBase {
   private int limit = 0;
   private int count = 0;
 
@@ -52,31 +52,28 @@ public class ColumnCountGetFilter implem
     return limit;
   }
 
+  @Override
   public boolean filterAllRemaining() {
     return this.count > this.limit;
   }
 
+  @Override
   public ReturnCode filterKeyValue(KeyValue v) {
     this.count++;
-    return filterAllRemaining()? ReturnCode.SKIP: ReturnCode.INCLUDE;
-  }
-
-  public boolean filterRow() {
-    return false;
-  }
-
-  public boolean filterRowKey(byte[] buffer, int offset, int length) {
-    return false;
+    return filterAllRemaining() ? ReturnCode.SKIP: ReturnCode.INCLUDE;
   }
 
+  @Override
   public void reset() {
     this.count = 0;
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     this.limit = in.readInt();
   }
 
+  @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(this.limit);
   }

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java Tue May 18 21:37:46 2010
@@ -1,5 +1,5 @@
-/**
- * Copyright 2007 The Apache Software Foundation
+/*
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.filter;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.KeyValue;
  * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
  * in order to efficient lookups and paginated results for end users.
  */
-public class ColumnPaginationFilter implements Filter
+public class ColumnPaginationFilter extends FilterBase
 {
   private int limit = 0;
   private int offset = 0;
@@ -50,11 +51,7 @@ public class ColumnPaginationFilter impl
     this.offset = offset;
   }
 
-  public boolean filterAllRemaining()
-  {
-    return false;
-  }
-
+  @Override
   public ReturnCode filterKeyValue(KeyValue v)
   {
     if(count >= offset + limit)
@@ -67,17 +64,7 @@ public class ColumnPaginationFilter impl
     return code;
   }
 
-  public boolean filterRow()
-  {
-    this.count = 0;
-    return false;
-  }
-
-  public boolean filterRowKey(byte[] buffer, int offset, int length)
-  {
-    return false;
-  }
-
+  @Override
   public void reset()
   {
     this.count = 0;

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java Tue May 18 21:37:46 2010
@@ -43,7 +43,7 @@ import java.util.Arrays;
  * <p>
  * Multiple filters can be combined using {@link FilterList}.
  */
-public abstract class CompareFilter implements Filter {
+public abstract class CompareFilter extends FilterBase {
 
   /** Comparison operators. */
   public enum CompareOp {
@@ -59,6 +59,8 @@ public abstract class CompareFilter impl
     GREATER_OR_EQUAL,
     /** greater than */
     GREATER,
+    /** no operation */
+    NO_OP,
   }
 
   protected CompareOp compareOp;
@@ -95,28 +97,12 @@ public abstract class CompareFilter impl
     return comparator;
   }
 
-  public void reset() {
-  }
-
-  public ReturnCode filterKeyValue(KeyValue v) {
-    return ReturnCode.INCLUDE;
-  }
-
-  public boolean filterRowKey(byte[] data, int offset, int length) {
-    return false;
-  }
-
-  public boolean filterRow() {
-    return false;
-  }
-
-  public boolean filterAllRemaining() {
-    return false;
-  }
-
   protected boolean doCompare(final CompareOp compareOp,
       final WritableByteArrayComparable comparator, final byte [] data,
       final int offset, final int length) {
+      if (compareOp == CompareOp.NO_OP) {
+	  return true;
+      }
     int compareResult =
       comparator.compareTo(Arrays.copyOfRange(data, offset,
         offset + length));

Added: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java?rev=945902&view=auto
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java (added)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java Tue May 18 21:37:46 2010
@@ -0,0 +1,163 @@
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A filter for adding inter-column timestamp matching
+ * Only cells with a correspondingly timestamped entry in
+ * the target column will be retained
+ * Not compatible with Scan.setBatch as operations need 
+ * full rows for correct filtering 
+ */
+public class DependentColumnFilter extends CompareFilter {
+
+  protected byte[] columnFamily;
+  protected byte[] columnQualifier;
+  protected boolean dropDependentColumn;
+
+  protected Set<Long> stampSet = new HashSet<Long>();
+  
+  /**
+   * Should only be used for writable
+   */
+  public DependentColumnFilter() {
+  }
+  
+  /**
+   * Build a dependent column filter with value checking
+   * dependent column varies will be compared using the supplied
+   * compareOp and comparator, for usage of which
+   * refer to {@link CompareFilter}
+   * 
+   * @param family dependent column family
+   * @param qualifier dependent column qualifier
+   * @param dropDependentColumn whether the column should be discarded after
+   * @param valueCompareOp comparison op 
+   * @param valueComparator comparator
+   */
+  public DependentColumnFilter(final byte [] family, final byte[] qualifier,
+		  final boolean dropDependentColumn, final CompareOp valueCompareOp,
+	      final WritableByteArrayComparable valueComparator) {
+    // set up the comparator   
+    super(valueCompareOp, valueComparator);
+    this.columnFamily = family;
+    this.columnQualifier = qualifier;
+    this.dropDependentColumn = dropDependentColumn;
+  }
+  
+  /**
+   * Constructor for DependentColumn filter.
+   * Keyvalues where a keyvalue from target column 
+   * with the same timestamp do not exist will be dropped. 
+   * 
+   * @param family name of target column family
+   * @param qualifier name of column qualifier
+   */
+  public DependentColumnFilter(final byte [] family, final byte [] qualifier) {
+    this(family, qualifier, false);
+  }
+  
+  /**
+   * Constructor for DependentColumn filter.
+   * Keyvalues where a keyvalue from target column 
+   * with the same timestamp do not exist will be dropped. 
+   * 
+   * @param family name of dependent column family
+   * @param qualifier name of dependent qualifier
+   * @param dropDependentColumn whether the dependent columns keyvalues should be discarded
+   */
+  public DependentColumnFilter(final byte [] family, final byte [] qualifier,
+      final boolean dropDependentColumn) {
+    this(family, qualifier, dropDependentColumn, CompareOp.NO_OP, null);
+  }
+  
+  
+  @Override
+  public boolean filterAllRemaining() {
+    return false;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    // Check if the column and qualifier match
+  	if (!v.matchingColumn(this.columnFamily, this.columnQualifier)) {
+        // include non-matches for the time being, they'll be discarded afterwards
+        return ReturnCode.INCLUDE;
+  	}
+  	// If it doesn't pass the op, skip it
+  	if(comparator != null && doCompare(compareOp, comparator, v.getValue(), 0, v.getValueLength()))
+  	  return ReturnCode.SKIP;  	  
+	
+    stampSet.add(v.getTimestamp());
+    if(dropDependentColumn) {
+    	return ReturnCode.SKIP;
+    }
+    return ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void filterRow(List<KeyValue> kvs) {
+    Iterator<KeyValue> it = kvs.iterator();
+    KeyValue kv;
+    while(it.hasNext()) {
+      kv = it.next();
+      if(!stampSet.contains(kv.getTimestamp())) {
+        it.remove();
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    return true;
+  }
+  
+  @Override
+  public boolean filterRow() {
+    return false;
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) {
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    stampSet.clear();    
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+	super.readFields(in);
+    this.columnFamily = Bytes.readByteArray(in);
+	if(this.columnFamily.length == 0) {
+	  this.columnFamily = null;
+	}
+    
+    this.columnQualifier = Bytes.readByteArray(in);
+    if(this.columnQualifier.length == 0) {
+      this.columnQualifier = null;
+    }	
+    
+    this.dropDependentColumn = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Bytes.writeByteArray(out, this.columnFamily);
+    Bytes.writeByteArray(out, this.columnQualifier);
+    out.writeBoolean(this.dropDependentColumn);    
+  }
+
+}

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Tue May 18 21:37:46 2010
@@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.io.Writable;
 
+import java.util.List;
+
 /**
  * Interface for row and column filters directly applied within the regionserver.
  * A filter can expect the following call sequence:
@@ -32,6 +34,7 @@ import org.apache.hadoop.io.Writable;
  * <li>{@link #filterRowKey(byte[],int,int)} -> true to drop this row,
  * if false, we will also call</li>
  * <li>{@link #filterKeyValue(KeyValue)} -> true to drop this key/value</li>
+ * <li>{@link #filterRow(List)} -> allows directmodification of the final list to be submitted
  * <li>{@link #filterRow()} -> last chance to drop entire row based on the sequence of
  * filterValue() calls. Eg: filter a row if it doesn't contain a specified column.
  * </li>
@@ -39,6 +42,11 @@ import org.apache.hadoop.io.Writable;
  *
  * Filter instances are created one per region/scan.  This interface replaces
  * the old RowFilterInterface.
+ *
+ * When implementing your own filters, consider inheriting {@link FilterBase} to help
+ * you reduce boilerplate.
+ * 
+ * @see FilterBase
  */
 public interface Filter extends Writable {
   /**
@@ -101,6 +109,21 @@ public interface Filter extends Writable
   }
 
   /**
+   * Chance to alter the list of keyvalues to be submitted.
+   * Modifications to the list will carry on
+   * @param kvs the list of keyvalues to be filtered
+   */
+  public void filterRow(List<KeyValue> kvs);
+
+  /**
+   * Return whether or not this filter actively uses filterRow(List)
+   * Primarily used to check for conflicts with scans(such as scans
+   * that do not read a full row at a time)
+   * @return
+   */
+  public boolean hasFilterRow();
+
+  /**
    * Last chance to veto row based on previous {@link #filterKeyValue(KeyValue)}
    * calls. The filter needs to retain state then return a particular value for
    * this call if they wish to exclude a row if a certain column is missing
@@ -108,4 +131,5 @@ public interface Filter extends Writable
    * @return true to exclude row, false to include row.
    */
   public boolean filterRow();
+
 }
\ No newline at end of file

Added: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java?rev=945902&view=auto
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (added)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java Tue May 18 21:37:46 2010
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance                                                                       with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.util.List;
+
+/**
+ * Abstract base class to help you implement new Filters.  Common "ignore" or NOOP type
+ * methods can go here, helping to reduce boiler plate in an ever-expanding filter
+ * library.
+ *
+ * If you could instantiate FilterBase, it would end up being a "null" filter -
+ * that is one that never filters anything.
+ *
+ * @inheritDoc
+ */
+public abstract class FilterBase implements Filter {
+
+  /**
+   * Filters that are purely stateless and do nothing in their reset() methods can inherit
+   * this null/empty implementation.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public void reset() {
+  }
+
+  /**
+   * Filters that do not filter by row key can inherit this implementation that
+   * never filters anything. (ie: returns false).
+   *
+   * @inheritDoc
+   */
+  @Override
+  public boolean filterRowKey(byte [] buffer, int offset, int length) {
+    return false;
+  }
+
+  /**
+   * Filters that never filter all remaining can inherit this implementation that
+   * never stops the filter early.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public boolean filterAllRemaining() {
+    return false;
+  }
+
+  /**
+   * Filters that dont filter by key value can inherit this implementation that
+   * includes all KeyValues.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public ReturnCode filterKeyValue(KeyValue ignored) {
+    return ReturnCode.INCLUDE;
+  }
+
+  /**
+   * Filters that never filter by modifying the returned List of KeyValues can
+   * inherit this implementation that does nothing.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public void filterRow(List<KeyValue> ignored) {
+  }
+
+  /**
+   * Fitlers that never filter by modifying the returned List of KeyValues can
+   * inherit this implementation that does nothing.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public boolean hasFilterRow() {
+    return false;
+  }
+
+  /**
+   * Filters that never filter by rows based on previously gathered state from
+   * @{link #filterKeyValue(KeyValue)} can inherit this implementation that
+   * never filters a row.
+   *
+   * @inheritDoc
+   */
+  @Override
+  public boolean filterRow() {
+    return false;
+  }
+}

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Tue May 18 21:37:46 2010
@@ -118,12 +118,14 @@ public class FilterList implements Filte
     this.filters.add(filter);
   }
 
+  @Override
   public void reset() {
     for (Filter filter : filters) {
       filter.reset();
     }
   }
 
+  @Override
   public boolean filterRowKey(byte[] rowKey, int offset, int length) {
     for (Filter filter : filters) {
       if (this.operator == Operator.MUST_PASS_ALL) {
@@ -141,6 +143,7 @@ public class FilterList implements Filte
     return this.operator == Operator.MUST_PASS_ONE;
   }
 
+  @Override
   public boolean filterAllRemaining() {
     for (Filter filter : filters) {
       if (filter.filterAllRemaining()) {
@@ -156,6 +159,7 @@ public class FilterList implements Filte
     return operator == Operator.MUST_PASS_ONE;
   }
 
+  @Override
   public ReturnCode filterKeyValue(KeyValue v) {
     for (Filter filter : filters) {
       if (operator == Operator.MUST_PASS_ALL) {
@@ -187,6 +191,24 @@ public class FilterList implements Filte
       ReturnCode.SKIP: ReturnCode.INCLUDE;
   }
 
+  @Override
+  public void filterRow(List<KeyValue> kvs) {
+    for (Filter filter : filters) {
+      filter.filterRow(kvs);
+    }
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    for (Filter filter : filters) {
+      if(filter.hasFilterRow()) {
+    	return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
   public boolean filterRow() {
     for (Filter filter : filters) {
       if (operator == Operator.MUST_PASS_ALL) {

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java Tue May 18 21:37:46 2010
@@ -24,13 +24,14 @@ import org.apache.hadoop.hbase.KeyValue;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.DataInput;
+import java.util.List;
 
 /**
  * A filter that will only return the first KV from each row.
  * <p>
  * This filter can be used to more efficiently perform row count operations.
  */
-public class FirstKeyOnlyFilter implements Filter {
+public class FirstKeyOnlyFilter extends FilterBase {
   private boolean foundKV = false;
 
   public FirstKeyOnlyFilter() {
@@ -40,24 +41,12 @@ public class FirstKeyOnlyFilter implemen
     foundKV = false;
   }
 
-  public boolean filterRowKey(byte[] buffer, int offset, int length) {
-    return false;
-  }
-
-  public boolean filterAllRemaining() {
-    return false;
-  }
-
   public ReturnCode filterKeyValue(KeyValue v) {
     if(foundKV) return ReturnCode.NEXT_ROW;
     foundKV = true;
     return ReturnCode.INCLUDE;
   }
 
-  public boolean filterRow() {
-    return false;
-  }
-
   public void write(DataOutput out) throws IOException {
   }
 

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java Tue May 18 21:37:46 2010
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Byte
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * A Filter that stops after the given row.  There is no "RowStopFilter" because
@@ -33,7 +34,7 @@ import java.io.IOException;
  *
  * Use this filter to include the stop row, eg: [A,Z].
  */
-public class InclusiveStopFilter implements Filter {
+public class InclusiveStopFilter extends FilterBase {
   private byte [] stopRowKey;
   private boolean done = false;
 
@@ -49,10 +50,6 @@ public class InclusiveStopFilter impleme
     return this.stopRowKey;
   }
 
-  public void reset() {
-    // noop, no state
-  }
-
   public boolean filterRowKey(byte[] buffer, int offset, int length) {
     if (buffer == null) {
       //noinspection RedundantIfStatement
@@ -75,15 +72,6 @@ public class InclusiveStopFilter impleme
     return done;
   }
 
-  public ReturnCode filterKeyValue(KeyValue v) {
-    // include everything.
-    return ReturnCode.INCLUDE;
-  }
-
-  public boolean filterRow() {
-    return false;
-  }
-
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.stopRowKey);
   }

Added: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java?rev=945902&view=auto
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java (added)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java Tue May 18 21:37:46 2010
@@ -0,0 +1,21 @@
+package org.apache.hadoop.hbase.filter;
+
+/**
+ * Used to indicate a filter incompatibility
+ */
+public class IncompatibleFilterException extends RuntimeException {
+  private static final long serialVersionUID = 3236763276623198231L;
+
+/** constructor */
+  public IncompatibleFilterException() {
+    super();
+  }
+
+  /**
+   * constructor
+   * @param s message
+   */
+  public IncompatibleFilterException(String s) {
+    super(s);
+  }
+}

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java Tue May 18 21:37:46 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Implementation of Filter interface that limits results to a specific page
@@ -36,7 +37,7 @@ import java.io.IOException;
  * individual HRegions by making sure that the page size is never exceeded
  * locally.
  */
-public class PageFilter implements Filter {
+public class PageFilter extends FilterBase {
   private long pageSize = Long.MAX_VALUE;
   private int rowsAccepted = 0;
 
@@ -61,16 +62,13 @@ public class PageFilter implements Filte
     return pageSize;
   }
 
-  public void reset() {
-    // noop
-  }
-
   public boolean filterAllRemaining() {
     return this.rowsAccepted >= this.pageSize;
   }
 
-  public boolean filterRowKey(byte[] rowKey, int offset, int length) {
-    return false;
+  public boolean filterRow() {
+    this.rowsAccepted++;
+    return this.rowsAccepted > this.pageSize;
   }
 
   public void readFields(final DataInput in) throws IOException {
@@ -80,13 +78,4 @@ public class PageFilter implements Filte
   public void write(final DataOutput out) throws IOException {
     out.writeLong(pageSize);
   }
-
-  public ReturnCode filterKeyValue(KeyValue v) {
-    return ReturnCode.INCLUDE;
-  }
-
-  public boolean filterRow() {
-    this.rowsAccepted++;
-    return this.rowsAccepted > this.pageSize;
-  }
 }
\ No newline at end of file

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java Tue May 18 21:37:46 2010
@@ -26,11 +26,12 @@ import org.apache.hadoop.hbase.util.Byte
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.DataInput;
+import java.util.List;
 
 /**
  * Pass results that have same row prefix.
  */
-public class PrefixFilter implements Filter {
+public class PrefixFilter extends FilterBase {
   protected byte [] prefix = null;
   protected boolean passedPrefix = false;
 
@@ -46,10 +47,6 @@ public class PrefixFilter implements Fil
     return prefix;
   }
 
-  public void reset() {
-    // Noop
-  }
-
   public boolean filterRowKey(byte[] buffer, int offset, int length) {
     if (buffer == null || this.prefix == null)
       return true;
@@ -70,14 +67,6 @@ public class PrefixFilter implements Fil
     return passedPrefix;
   }
 
-  public ReturnCode filterKeyValue(KeyValue v) {
-    return ReturnCode.INCLUDE;
-  }
-
-  public boolean filterRow() {
-    return false;
-  }
-
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.prefix);
   }

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java Tue May 18 21:37:46 2010
@@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 
+import java.util.List;
+
 /**
  * This filter is used to filter based on the key. It takes an operator
  * (equal, greater, not equal, etc) and a byte [] comparator for the row,

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Tue May 18 21:37:46 2010
@@ -32,6 +32,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * This filter is used to filter cells based on value. It takes a {@link CompareFilter.CompareOp}
@@ -60,7 +61,7 @@ import java.util.Arrays;
  * <p>
  * To filter based on the value of all scanned columns, use {@link ValueFilter}.
  */
-public class SingleColumnValueFilter implements Filter {
+public class SingleColumnValueFilter extends FilterBase {
   static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class);
 
   protected byte [] columnFamily;
@@ -144,12 +145,6 @@ public class SingleColumnValueFilter imp
     return columnQualifier;
   }
 
-  public boolean filterRowKey(byte[] rowKey, int offset, int length) {
-    // We don't filter on the row key... we filter later on column value so
-    // always return false.
-    return false;
-  }
-
   public ReturnCode filterKeyValue(KeyValue keyValue) {
     // System.out.println("REMOVE KEY=" + keyValue.toString() + ", value=" + Bytes.toString(keyValue.getValue()));
     if (this.matchedColumn) {
@@ -195,10 +190,6 @@ public class SingleColumnValueFilter imp
     }
   }
 
-  public boolean filterAllRemaining() {
-    return false;
-  }
-
   public boolean filterRow() {
     // If column was found, return false if it was matched, true if it was not
     // If column not found, return true if we filter if missing, false if not

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Tue May 18 21:37:46 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * A wrapper filter that filters an entire row if any of the KeyValue checks do
@@ -44,7 +45,7 @@ import java.io.IOException;
  * Without this filter, the other non-zero valued columns in the row would still
  * be emitted.
  */
-public class SkipFilter implements Filter {
+public class SkipFilter extends FilterBase {
   private boolean filterRow = false;
   private Filter filter;
 
@@ -69,14 +70,6 @@ public class SkipFilter implements Filte
     filterRow = filterRow || value;
   }
 
-  public boolean filterRowKey(byte[] buffer, int offset, int length) {
-    return false;
-  }
-
-  public boolean filterAllRemaining() {
-    return false;
-  }
-
   public ReturnCode filterKeyValue(KeyValue v) {
     ReturnCode c = filter.filterKeyValue(v);
     changeFR(c != ReturnCode.INCLUDE);

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Tue May 18 21:37:46 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * A wrapper filter that returns true from {@link #filterAllRemaining()} as soon
@@ -34,7 +35,7 @@ import java.io.IOException;
  * {@link org.apache.hadoop.hbase.filter.Filter#filterAllRemaining()} methods
  * returns true.
  */
-public class WhileMatchFilter implements Filter {
+public class WhileMatchFilter extends FilterBase {
   private boolean filterAllRemaining = false;
   private Filter filter;
 

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue May 18 21:37:46 2010
@@ -158,6 +158,7 @@ public class HbaseObjectWritable impleme
     addToMap(SkipFilter.class, code++);
     addToMap(WritableByteArrayComparable.class, code++);
     addToMap(FirstKeyOnlyFilter.class, code++);
+    addToMap(DependentColumnFilter.class, code++);
 
     addToMap(Delete [].class, code++);
 

Modified: hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=945902&r1=945901&r2=945902&view=diff
==============================================================================
--- hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May 18 21:37:46 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -304,7 +305,7 @@ public class HRegion implements HConstan
   /**
    * Initialize this region and get it ready to roll.
    * Called after construction.
-   * 
+   *
    * @param initialFiles path
    * @param reporter progressable
    * @throws IOException e
@@ -448,7 +449,7 @@ public class HRegion implements HConstan
    public ReadWriteConsistencyControl getRWCC() {
      return rwcc;
    }
-   
+
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
    * service any more calls.
@@ -459,7 +460,7 @@ public class HRegion implements HConstan
    * @return Vector of all the storage files that the HRegion's component
    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
    * vector if already closed and null if judged that it should not close.
-   * 
+   *
    * @throws IOException e
    */
   public List<StoreFile> close() throws IOException {
@@ -477,7 +478,7 @@ public class HRegion implements HConstan
    * @return Vector of all the storage files that the HRegion's component
    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
    * we are not to close at this time or we are already closed.
-   * 
+   *
    * @throws IOException e
    */
   public List<StoreFile> close(final boolean abort) throws IOException {
@@ -875,7 +876,7 @@ public class HRegion implements HConstan
    * time-sensitive thread.
    *
    * @return true if cache was flushed
-   * 
+   *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
@@ -941,7 +942,7 @@ public class HRegion implements HConstan
    * <p> This method may block for some time.
    *
    * @return true if the region needs compacting
-   * 
+   *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
@@ -1051,7 +1052,7 @@ public class HRegion implements HConstan
       }
 
       storeFlushers.clear();
-      
+
       // Set down the memstore size by amount of flush.
       this.memstoreSize.addAndGet(-currentMemStoreSize);
     } catch (Throwable t) {
@@ -1660,7 +1661,7 @@ public class HRegion implements HConstan
         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
            walEdit, now);
       }
-      
+
       long size = 0;
 
       w = rwcc.beginMemstoreInsert();
@@ -1996,9 +1997,7 @@ public class HRegion implements HConstan
 
       results.clear();
       boolean returnResult = nextInternal(limit);
-      if (!returnResult && filter != null && filter.filterRow()) {
-        results.clear();
-      }
+
       outResults.addAll(results);
       resetFilters();
       if (isFilterDone()) {
@@ -2024,6 +2023,13 @@ public class HRegion implements HConstan
       while (true) {
         byte [] currentRow = peekRow();
         if (isStopRow(currentRow)) {
+          if (filter != null && filter.hasFilterRow()) {
+            filter.filterRow(results);
+          }
+          if (filter != null && filter.filterRow()) {
+            results.clear();
+          }
+
           return false;
         } else if (filterRowKey(currentRow)) {
           nextRow(currentRow);
@@ -2032,19 +2038,33 @@ public class HRegion implements HConstan
           do {
             this.storeHeap.next(results, limit);
             if (limit > 0 && results.size() == limit) {
-              return true;
+              if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
+                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
+              return true; // we are expecting more yes, but also limited to how many we can return.
             }
           } while (Bytes.equals(currentRow, nextRow = peekRow()));
 
           final boolean stopRow = isStopRow(nextRow);
-          if (!stopRow && (results.isEmpty() || filterRow())) {
+
+          // now that we have an entire row, lets process with a filters:
+
+          // first filter with the filterRow(List)
+          if (filter != null && filter.hasFilterRow()) {
+            filter.filterRow(results);
+          }
+
+          if (results.isEmpty() || filterRow()) {
             // this seems like a redundant step - we already consumed the row
             // there're no left overs.
             // the reasons for calling this method are:
             // 1. reset the filters.
             // 2. provide a hook to fast forward the row (used by subclasses)
             nextRow(currentRow);
-            continue;
+
+            // This row was totally filtered out, if this is NOT the last row,
+            // we should continue on.
+
+            if (!stopRow) continue;
           }
           return !stopRow;
         }
@@ -2694,7 +2714,7 @@ public class HRegion implements HConstan
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
       (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
-  
+
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
       ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER +

Added: hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java?rev=945902&view=auto
==============================================================================
--- hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java (added)
+++ hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java Tue May 18 21:37:46 2010
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import junit.framework.TestCase;
+
+public class TestDependentColumnFilter extends TestCase {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  private static final byte[][] ROWS = {
+	  Bytes.toBytes("test1"),Bytes.toBytes("test2")
+  };
+  private static final byte[][] FAMILIES = {
+	  Bytes.toBytes("familyOne"),Bytes.toBytes("familyTwo")
+  };
+  private static final long STAMP_BASE = System.currentTimeMillis();
+  private static final long[] STAMPS = {
+	  STAMP_BASE-100, STAMP_BASE-200, STAMP_BASE-300
+  };
+  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
+  private static final byte[][] BAD_VALS = {
+	  Bytes.toBytes("bad1"), Bytes.toBytes("bad2"), Bytes.toBytes("bad3")
+  };
+  private static final byte[] MATCH_VAL = Bytes.toBytes("match");
+  private HBaseTestingUtility testUtil;
+
+  List<KeyValue> testVals;
+  private HRegion region;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    testUtil = new HBaseTestingUtility();
+
+    testVals = makeTestVals();
+
+    HTableDescriptor htd = new HTableDescriptor(getName());
+    htd.addFamily(new HColumnDescriptor(FAMILIES[0]));
+    htd.addFamily(new HColumnDescriptor(FAMILIES[1]));
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    this.region = HRegion.createHRegion(info, testUtil.getTestDir(), testUtil.getConfiguration());
+    addData();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    this.region.close();
+  }
+
+  private void addData() throws IOException {
+    Put put = new Put(ROWS[0]);
+    // add in an entry for each stamp, with 2 as a "good" value
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]);
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
+    // add in entries for stamps 0 and 2.
+    // without a value check both will be "accepted"
+    // with one 2 will be accepted(since the corresponding ts entry
+    // has a matching value
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[0], BAD_VALS[0]);
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
+
+    this.region.put(put);
+
+    put = new Put(ROWS[1]);
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
+    // there is no corresponding timestamp for this so it should never pass
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
+    // if we reverse the qualifiers this one should pass
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL);
+    // should pass
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]);
+
+    this.region.put(put);
+  }
+
+  private List<KeyValue> makeTestVals() {
+	List<KeyValue> testVals = new ArrayList<KeyValue>();
+	testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]));
+	testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]));
+	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]));
+	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL));
+	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]));
+
+	return testVals;
+  }
+
+  /**
+   * This shouldn't be confused with TestFilter#verifyScan
+   * as expectedKeys is not the per row total, but the scan total
+   *
+   * @param s
+   * @param expectedRows
+   * @param expectedCells
+   * @throws IOException
+   */
+  private void verifyScan(Scan s, long expectedRows, long expectedCells)
+  throws IOException {
+    InternalScanner scanner = this.region.getScanner(s);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    int i = 0;
+    int cells = 0;
+    for (boolean done = true; done; i++) {
+      done = scanner.next(results);
+      Arrays.sort(results.toArray(new KeyValue[results.size()]),
+          KeyValue.COMPARATOR);
+      LOG.info("counter=" + i + ", " + results);
+      if (results.isEmpty()) break;
+      cells += results.size();
+      assertTrue("Scanned too many rows! Only expected " + expectedRows +
+          " total but already scanned " + (i+1), expectedRows > i);
+      assertTrue("Expected " + expectedCells + " cells total but " +
+          "already scanned " + cells, expectedCells >= cells);
+      results.clear();
+    }
+    assertEquals("Expected " + expectedRows + " rows but scanned " + i +
+        " rows", expectedRows, i);
+    assertEquals("Expected " + expectedCells + " cells but scanned " + cells +
+            " cells", expectedCells, cells);
+  }
+
+  /**
+   * Test scans using a DependentColumnFilter
+   */
+  public void testScans() throws Exception {
+    Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
+
+    Scan scan = new Scan();
+    scan.setFilter(filter);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+
+    verifyScan(scan, 2, 8);
+
+    // drop the filtering cells
+    filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true);
+    scan = new Scan();
+    scan.setFilter(filter);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+
+    verifyScan(scan, 2, 3);
+
+    // include a comparator operation
+    filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, false,
+        CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
+    scan = new Scan();
+    scan.setFilter(filter);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+
+    /*
+     * expecting to get the following 3 cells
+     * row 0
+     *   put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
+     *   put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
+     * row 1
+     *   put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
+     */
+    verifyScan(scan, 2, 3);
+
+    // include a comparator operation and drop comparator
+    filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true,
+        CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
+    scan = new Scan();
+    scan.setFilter(filter);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+
+    /*
+     * expecting to get the following 1 cell
+     * row 0
+     *   put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
+     */
+    verifyScan(scan, 1, 1);
+
+  }
+
+  /**
+   * Test that the filter correctly drops rows without a corresponding timestamp
+   *
+   * @throws Exception
+   */
+  public void testFilterDropping() throws Exception {
+    Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
+    List<KeyValue> accepted = new ArrayList<KeyValue>();
+    for(KeyValue val : testVals) {
+      if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
+        accepted.add(val);
+      }
+    }
+    assertEquals("check all values accepted from filterKeyValue", 5, accepted.size());
+
+    filter.filterRow(accepted);
+    assertEquals("check filterRow(List<KeyValue>) dropped cell without corresponding column entry", 4, accepted.size());
+
+    // start do it again with dependent column dropping on
+    filter = new DependentColumnFilter(FAMILIES[1], QUALIFIER, true);
+    accepted.clear();
+    for(KeyValue val : testVals) {
+        if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
+          accepted.add(val);
+        }
+      }
+      assertEquals("check the filtering column cells got dropped", 2, accepted.size());
+
+      filter.filterRow(accepted);
+      assertEquals("check cell retention", 2, accepted.size());
+  }
+}



Mime
View raw message