Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E444D70D for ; Fri, 29 Jun 2012 13:12:20 +0000 (UTC) Received: (qmail 41643 invoked by uid 500); 29 Jun 2012 13:12:20 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 41597 invoked by uid 500); 29 Jun 2012 13:12:19 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 41589 invoked by uid 99); 29 Jun 2012 13:12:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jun 2012 13:12:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jun 2012 13:12:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6309C23888CD for ; Fri, 29 Jun 2012 13:11:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1355353 - in /accumulo/branches/ACCUMULO-652/core/src: main/java/org/apache/accumulo/core/file/rfile/ main/java/org/apache/accumulo/core/iterators/ main/java/org/apache/accumulo/core/iterators/predicates/ main/java/org/apache/accumulo/core... Date: Fri, 29 Jun 2012 13:11:50 -0000 To: commits@accumulo.apache.org From: afuchs@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120629131152.6309C23888CD@eris.apache.org> Author: afuchs Date: Fri Jun 29 13:11:48 2012 New Revision: 1355353 URL: http://svn.apache.org/viewvc?rev=1355353&view=rev Log: ACCUMULO-652 added column visibility block indexing and unit test, added Filterer interface to the system iterators, made VisibilityFilter use the new filtering capability, added notion of optional filtering to the Filterer interface Added: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java Added: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java?rev=1355353&view=auto ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java (added) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java Fri Jun 29 13:11:48 2012 @@ -0,0 +1,117 @@ +package org.apache.accumulo.core.file.rfile; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Writable; + +public class BlockStats implements Writable { + + private static ColumnVisibility emptyVisibility = new ColumnVisibility(); + private static int maxVisibilityLength = 100; + + public BlockStats(long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int entries) { + this.minTimestamp = minTimestamp; + this.maxTimestamp = maxTimestamp; + this.minimumVisibility = minimumVisibility; + this.entries = entries; + this.version = RFile.RINDEX_VER_7; + } + + long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; + ColumnVisibility minimumVisibility = null; + int entries = 0; + final int version; + + public void updateBlockStats(Key key, Value value) { + if (minTimestamp > key.getTimestamp()) + minTimestamp = key.getTimestamp(); + if (maxTimestamp < key.getTimestamp()) + maxTimestamp = key.getTimestamp(); + entries++; + if (key.getColumnVisibilityData().length() > 0) + combineVisibilities(new ColumnVisibility(key.getColumnVisibility())); + else + combineVisibilities(emptyVisibility); + } + + private void combineVisibilities(ColumnVisibility other) { + if (minimumVisibility == null) + minimumVisibility = other; + else + minimumVisibility = minimumVisibility.or(other); + } + + public void updateBlockStats(BlockStats other) { + this.entries += other.entries; + if (this.minTimestamp > other.minTimestamp) + this.minTimestamp = other.minTimestamp; + if (this.maxTimestamp < other.maxTimestamp) + this.maxTimestamp = other.maxTimestamp; + combineVisibilities(other.minimumVisibility); + } + + public BlockStats() { + minTimestamp = Long.MAX_VALUE; + maxTimestamp = Long.MIN_VALUE; + minimumVisibility = null; + entries = 0; + version = RFile.RINDEX_VER_7; + } + + public BlockStats(int version) { + this.version = version; + } + + @Override + public void readFields(DataInput in) throws IOException { + if (version == RFile.RINDEX_VER_7) { + minTimestamp = in.readLong(); + maxTimestamp = in.readLong(); + int visibilityLength = in.readInt(); + if (visibilityLength >= 0) { + byte[] visibility = new byte[visibilityLength]; + in.readFully(visibility); + minimumVisibility = new ColumnVisibility(visibility); + } else { + minimumVisibility = null; + } + } else { + minTimestamp = Long.MIN_VALUE; + maxTimestamp = Long.MAX_VALUE; + minimumVisibility = null; + } + entries = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + if (version == RFile.RINDEX_VER_7) { + out.writeLong(minTimestamp); + out.writeLong(maxTimestamp); + if (minimumVisibility == null) + out.writeInt(-1); + else { + byte[] visibility = minimumVisibility.getExpression(); + if (visibility.length > maxVisibilityLength) { + System.out.println("expression too large: "+toString()); + out.writeInt(0); + } else { + out.writeInt(visibility.length); + out.write(visibility); + } + } + } + out.writeInt(entries); + } + + @Override + public String toString() { + return "{"+entries+";"+minTimestamp+";"+maxTimestamp+";"+minimumVisibility+"}"; + } +} Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Fri Jun 29 13:11:48 2012 @@ -784,7 +784,9 @@ public class RFile { * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate) */ @Override - public void applyFilter(Predicate filter) { + public void applyFilter(Predicate filter, boolean required) { + if(required) + throw new UnsupportedOperationException("Cannot guarantee filtration"); // TODO support general filters if(filter instanceof TimestampRangePredicate) { @@ -797,16 +799,12 @@ public class RFile { timestampRange = p; index.setTimestampRange(timestampRange); } - else if(filter instanceof ColumnVisibilityPredicate) + if(filter instanceof ColumnVisibilityPredicate) { filterChanged = true; columnVisibilityPredicate = (ColumnVisibilityPredicate)filter; index.setColumnVisibilityPredicate(columnVisibilityPredicate); } - else - { - throw new RuntimeException("yikes, not yet implemented"); - } } } @@ -1042,7 +1040,9 @@ public class RFile { if (include) { if(timestampFilter != null) - lgr.applyFilter(timestampFilter); + lgr.applyFilter(timestampFilter,false); + if(columnVisibilityPredicate != null) + lgr.applyFilter(columnVisibilityPredicate,false); lgr.seek(range, EMPTY_CF_SET, false); addSource(lgr); numLGSeeked++; @@ -1093,6 +1093,7 @@ public class RFile { ArrayList> filters = new ArrayList>(); TimestampRangePredicate timestampFilter = null; + ColumnVisibilityPredicate columnVisibilityPredicate = null; Key topKey; Value topValue; @@ -1171,11 +1172,14 @@ public class RFile { * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate) */ @Override - public void applyFilter(Predicate filter) { - filters.add(filter); + public void applyFilter(Predicate filter, boolean required) { + if(required) + filters.add(filter); // the HeapIterator will pass this filter on to its children, a collection of LocalityGroupReaders if(filter instanceof TimestampRangePredicate) this.timestampFilter = (TimestampRangePredicate)filter; + if(filter instanceof ColumnVisibilityPredicate) + this.columnVisibilityPredicate = (ColumnVisibilityPredicate)filter; } } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java Fri Jun 29 13:11:48 2012 @@ -17,8 +17,15 @@ package org.apache.accumulo.core.iterators; /** - * + * An interface designed to be added to containers to specify what + * can be left out when iterating over the contents of that container. */ public interface Filterer { - public void applyFilter(Predicate filter); + /** + * Either optionally or always leave out entries for which the given Predicate evaluates to false + * @param filter The predicate that specifies whether an entry can be left out + * @param required If true, entries that don't pass the filter must be left out. If false, then treat + * purely as a potential optimization. + */ + public void applyFilter(Predicate filter, boolean required); } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java Fri Jun 29 13:11:48 2012 @@ -25,7 +25,7 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -public abstract class WrappingIterator implements SortedKeyValueIterator { +public abstract class WrappingIterator implements SortedKeyValueIterator, Filterer { private SortedKeyValueIterator source = null; boolean seenSeek = false; @@ -93,4 +93,13 @@ public abstract class WrappingIterator i seenSeek = true; } + @SuppressWarnings("unchecked") + @Override + public void applyFilter(Predicate filter, boolean required) { + if(source instanceof Filterer) + ((Filterer)source).applyFilter(filter, required); + else if(required) + throw new IllegalArgumentException("Cannot require filter of underlying iterator"); + } + } Added: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java?rev=1355353&view=auto ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java (added) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java Fri Jun 29 13:11:48 2012 @@ -0,0 +1,26 @@ +package org.apache.accumulo.core.iterators.predicates; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Predicate; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; + +public final class ColumnVisibilityPredicate implements Predicate { + public final Authorizations auths; + + public ColumnVisibilityPredicate(Authorizations auths) + { + this.auths = auths; + } + + @Override + public boolean evaluate(Key k, Value v) { + return new ColumnVisibility(k.getColumnVisibility()).evaluate(auths); + } + + @Override + public String toString() { + return "{"+auths+"}"; + } +} Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java Fri Jun 29 13:11:48 2012 @@ -27,7 +27,9 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filterer; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.Predicate; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; /** @@ -37,7 +39,7 @@ import org.apache.accumulo.core.iterator * */ -public class MultiIterator extends HeapIterator { +public class MultiIterator extends HeapIterator implements Filterer { private List> iters; private Range fence; @@ -111,4 +113,15 @@ public class MultiIterator extends HeapI public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } + + @SuppressWarnings("unchecked") + @Override + public void applyFilter(Predicate filter, boolean required) { + for(SortedKeyValueIterator skvi: iters) { + if(skvi instanceof Filterer) + ((Filterer)skvi).applyFilter(filter, required); + else if(required) + throw new IllegalArgumentException("Cannot require filter of underlying iterator"); + } + } } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java Fri Jun 29 13:11:48 2012 @@ -20,8 +20,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.data.ByteSequence; @@ -29,10 +31,12 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filterer; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.Predicate; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -public class SourceSwitchingIterator implements SortedKeyValueIterator, InterruptibleIterator { +public class SourceSwitchingIterator implements SortedKeyValueIterator, InterruptibleIterator, Filterer { public interface DataSource { boolean isCurrent(); @@ -144,6 +148,7 @@ public class SourceSwitchingIterator imp while (!source.isCurrent()) { source = source.getNewDataSource(); iter = source.iterator(); + applyExistingFilters(); if (iflag != null) ((InterruptibleIterator) iter).setInterruptFlag(iflag); @@ -161,6 +166,7 @@ public class SourceSwitchingIterator imp if (iter == null) { iter = source.iterator(); + applyExistingFilters(); if (iflag != null) ((InterruptibleIterator) iter).setInterruptFlag(iflag); } @@ -197,4 +203,31 @@ public class SourceSwitchingIterator imp } + private Map,Boolean> filters = new HashMap,Boolean>(); + + private void applyExistingFilters() + { + for(Entry,Boolean> filter:filters.entrySet()) + { + _applyFilter(filter.getKey(), filter.getValue()); + } + } + + @SuppressWarnings("unchecked") + private void _applyFilter(Predicate filter, boolean required) + { + if(iter != null && iter instanceof Filterer) + ((Filterer)iter).applyFilter(filter, required); + else if(iter != null && required) + throw new IllegalArgumentException("Cannot require filter of underlying iterator"); + } + + @Override + public void applyFilter(Predicate filter, boolean required) { + // apply filter to the current data source + _applyFilter(filter,required); + // save filter for application to future data sources + filters.put(filter, required); + } + } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java Fri Jun 29 13:11:48 2012 @@ -22,7 +22,9 @@ import java.util.Map; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.Filterer; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.Predicate; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -30,7 +32,7 @@ import org.apache.hadoop.io.WritableComp /*** * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its methods are synchronized */ -public class SynchronizedIterator,V extends Writable> implements SortedKeyValueIterator { +public class SynchronizedIterator,V extends Writable> implements SortedKeyValueIterator, Filterer { private SortedKeyValueIterator source = null; @@ -75,4 +77,13 @@ public class SynchronizedIterator source) { this.source = source; } + + @SuppressWarnings("unchecked") + @Override + public void applyFilter(Predicate filter, boolean required) { + if(source instanceof Filterer) + ((Filterer)source).applyFilter(filter, required); + else if(required) + throw new IllegalArgumentException("cannot guarantee filter with non filterer source"); + } } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java Fri Jun 29 13:11:48 2012 @@ -25,11 +25,13 @@ import org.apache.accumulo.core.data.Byt import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filterer; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.Predicate; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -public class TimeSettingIterator implements InterruptibleIterator { +public class TimeSettingIterator implements InterruptibleIterator, Filterer { private SortedKeyValueIterator source; private long time; @@ -88,5 +90,14 @@ public class TimeSettingIterator impleme public Value getTopValue() { return source.getTopValue(); } + + @SuppressWarnings("unchecked") + @Override + public void applyFilter(Predicate filter, boolean required) { + if(source instanceof Filterer) + ((Filterer)source).applyFilter(filter, required); + else if(required) + throw new IllegalArgumentException("cannot guarantee filter with non filterer source"); + } } Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java Fri Jun 29 13:11:48 2012 @@ -16,11 +16,16 @@ */ package org.apache.accumulo.core.iterators.system; +import java.io.IOException; +import java.util.Map; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.Filterer; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.util.TextUtil; @@ -43,7 +48,14 @@ public class VisibilityFilter extends Fi this.cache = new LRUMap(1000); this.tmpVis = new Text(); } - + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + if(source instanceof Filterer) + ((Filterer)source).applyFilter(new ColumnVisibilityPredicate(auths), false); + } + @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { return new VisibilityFilter(getSource().deepCopy(env), auths, TextUtil.getBytes(defaultVisibility)); Added: accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java?rev=1355353&view=auto ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java (added) +++ accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java Fri Jun 29 13:11:48 2012 @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Collections; +import java.util.Map.Entry; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream; +import org.apache.accumulo.core.iterators.Predicate; +import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate; +import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.junit.Test; + +public class AuthorizationFilterTest { + + @Test + public void testRFileAuthorizationFiltering() throws Exception { + Authorizations auths = new Authorizations("a", "b", "c"); + Predicate columnVisibilityPredicate = new ColumnVisibilityPredicate(auths); + int expected = 0; + Random r = new Random(); + Configuration conf = new Configuration(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf); + RFile.Writer writer = new RFile.Writer(_cbw, 1000, 1000); + writer.startDefaultLocalityGroup(); + byte[] row = new byte[10]; + byte[] colFam = new byte[10]; + byte[] colQual = new byte[10]; + Value value = new Value(new byte[0]); + TreeMap inputBuffer = new TreeMap(); + ColumnVisibility[] goodColVises = {new ColumnVisibility("a&b"), new ColumnVisibility("b&c"), new ColumnVisibility("a&c")}; + ColumnVisibility[] badColVises = {new ColumnVisibility("x"), new ColumnVisibility("y"), new ColumnVisibility("a&z")}; + for (ColumnVisibility colVis : goodColVises) + for (int i = 0; i < 10; i++) { + r.nextBytes(row); + r.nextBytes(colFam); + r.nextBytes(colQual); + Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i); + if (columnVisibilityPredicate.evaluate(k, value)) + expected++; + inputBuffer.put(k, value); + } + for (ColumnVisibility colVis : badColVises) + for (int i = 0; i < 10000; i++) { + r.nextBytes(row); + r.nextBytes(colFam); + r.nextBytes(colQual); + Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i); + if (columnVisibilityPredicate.evaluate(k, value)) + expected++; + inputBuffer.put(k, value); + } + for (Entry e : inputBuffer.entrySet()) { + writer.append(e.getKey(), e.getValue()); + } + writer.close(); + + // scan the RFile to bring back keys in a given timestamp range + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new SeekableByteArrayInputStream(data); + FSDataInputStream in = new FSDataInputStream(bais); + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf); + RFile.Reader reader = new RFile.Reader(_cbr); + int count = 0; + reader.applyFilter(columnVisibilityPredicate,true); + reader.seek(new Range(), Collections.EMPTY_SET, false); + while (reader.hasTop()) { + count++; + assertTrue(columnVisibilityPredicate.evaluate(reader.getTopKey(), reader.getTopValue())); + reader.next(); + } + assertEquals(expected, count); + } +} Modified: accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java?rev=1355353&r1=1355352&r2=1355353&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java (original) +++ accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java Fri Jun 29 13:11:48 2012 @@ -45,7 +45,6 @@ public class TimestampFilterTest { @Test public void testRFileTimestampFiltering() throws Exception { - // TODO create an RFile with increasing timestamp and random key order Predicate timeRange = new TimestampRangePredicate(73, 117); int expected = 0; Random r = new Random(); @@ -84,7 +83,7 @@ public class TimestampFilterTest { CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf); RFile.Reader reader = new RFile.Reader(_cbr); int count = 0; - reader.applyFilter(timeRange); + reader.applyFilter(timeRange,true); reader.seek(new Range(), Collections.EMPTY_SET, false); while(reader.hasTop()) {