Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 87128 invoked from network); 10 Jul 2009 17:49:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Jul 2009 17:49:12 -0000 Received: (qmail 774 invoked by uid 500); 10 Jul 2009 17:49:21 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 755 invoked by uid 500); 10 Jul 2009 17:49:21 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 745 invoked by uid 99); 10 Jul 2009 17:49:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jul 2009 17:49:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jul 2009 17:49:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7364623888D9; Fri, 10 Jul 2009 17:48:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r793052 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java QueryFilter.java SliceQueryFilter.java Table.java Date: Fri, 10 Jul 2009 17:48:50 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090710174850.7364623888D9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jbellis Date: Fri Jul 10 17:48:49 2009 New Revision: 793052 URL: http://svn.apache.org/viewvc?rev=793052&view=rev Log: refactor out QueryFilter, SliceQueryFilter. patch by jbellis; reviewed by Jun Rao for CASSANDRA-287 Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=793052&r1=793051&r2=793052&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 10 17:48:49 2009 @@ -1555,7 +1555,7 @@ * get a list of columns starting from a given column, in a specified order * only the latest version of a column is returned */ - public ColumnFamily getSliceFrom(String key, String cfName, String startColumn, String finishColumn, boolean isAscending, int offset, int count) + public ColumnFamily getColumnFamily(QueryFilter filter) throws IOException, ExecutionException, InterruptedException { sstableLock_.readLock().lock(); @@ -1569,7 +1569,7 @@ memtableLock_.readLock().lock(); try { - iter = memtable_.getColumnIterator(key, cfName, isAscending, startColumn); + iter = filter.getMemColumnIterator(memtable_); returnCF = iter.getColumnFamily(); } finally @@ -1579,10 +1579,10 @@ iterators.add(iter); /* add the memtables being flushed */ - List memtables = getUnflushedMemtables(cfName); + List memtables = getUnflushedMemtables(filter.getColumnFamilyName()); for (Memtable memtable:memtables) { - iter = memtable.getColumnIterator(key, cfName, isAscending, startColumn); + iter = filter.getMemColumnIterator(memtable); returnCF.delete(iter.getColumnFamily()); iterators.add(iter); } @@ -1591,67 +1591,24 @@ List sstables = new ArrayList(ssTables_.values()); for (SSTableReader sstable : sstables) { - iter = new SSTableColumnIterator(sstable.getFilename(), key, cfName, startColumn, isAscending); + iter = filter.getSSTableColumnIterator(sstable); if (iter.hasNext()) { returnCF.delete(iter.getColumnFamily()); iterators.add(iter); } - } - - // define a 'reduced' iterator that merges columns w/ the same name, which - // greatly simplifies computing liveColumns in the presence of tombstones. - Comparator comparator = new Comparator() - { - public int compare(IColumn c1, IColumn c2) + else { - return c1.name().compareTo(c2.name()); + iter.close(); } - }; - if (!isAscending) - comparator = new ReverseComparator(comparator); + } + + Comparator comparator = filter.getColumnComparator(); Iterator collated = IteratorUtils.collatedIterator(comparator, iterators); if (!collated.hasNext()) - return ColumnFamily.create(table_, cfName); - ReducingIterator reduced = new ReducingIterator(collated) - { - ColumnFamily curCF = returnCF.cloneMeShallow(); - - protected Object getKey(IColumn o) - { - return o == null ? null : o.name(); - } + return ColumnFamily.create(table_, filter.getColumnFamilyName()); - public void reduce(IColumn current) - { - curCF.addColumn(current); - } - - protected IColumn getReduced() - { - IColumn c = curCF.getAllColumns().first(); - curCF.clear(); - return c; - } - }; - - // add unique columns to the CF container - int liveColumns = 0; - int limit = offset + count; - for (IColumn column : reduced) - { - if (liveColumns >= limit) - break; - if (!finishColumn.isEmpty() - && ((isAscending && column.name().compareTo(finishColumn) > 0)) - || (!isAscending && column.name().compareTo(finishColumn) < 0)) - break; - if (!column.isMarkedForDelete()) - liveColumns++; - - if (liveColumns > offset) - returnCF.addColumn(column); - } + filter.collectColumns(returnCF, collated); return removeDeleted(returnCF); } Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java?rev=793052&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java Fri Jul 10 17:48:49 2009 @@ -0,0 +1,89 @@ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.util.List; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Collection; + +import org.apache.commons.collections.comparators.ReverseComparator; +import org.apache.commons.collections.IteratorUtils; + +import org.apache.cassandra.io.SSTableReader; +import org.apache.cassandra.utils.ReducingIterator; + +public abstract class QueryFilter +{ + public final String key; + public final String columnFamilyColumn; + + protected QueryFilter(String key, String columnFamilyColumn) + { + this.key = key; + this.columnFamilyColumn = columnFamilyColumn; + } + + /** + * returns an iterator that returns columns from the given memtable + * matching the Filter criteria in sorted order. + */ + public abstract ColumnIterator getMemColumnIterator(Memtable memtable); + + /** + * returns an iterator that returns columns from the given SSTable + * matching the Filter criteria in sorted order. + */ + public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException; + + /** + * collects columns from reducedColumns into returnCF. Termination is determined + * by the filter code, which should have some limit on the number of columns + * to avoid running out of memory on large rows. + */ + public abstract void collectColumns(ColumnFamily returnCF, ReducingIterator reducedColumns); + + protected Comparator getColumnComparator() + { + return new Comparator() + { + public int compare(IColumn c1, IColumn c2) + { + return c1.name().compareTo(c2.name()); + } + }; + } + + public void collectColumns(final ColumnFamily returnCF, Iterator collatedColumns) + { + // define a 'reduced' iterator that merges columns w/ the same name, which + // greatly simplifies computing liveColumns in the presence of tombstones. + ReducingIterator reduced = new ReducingIterator(collatedColumns) + { + ColumnFamily curCF = returnCF.cloneMeShallow(); + + protected Object getKey(IColumn o) + { + return o == null ? null : o.name(); + } + + public void reduce(IColumn current) + { + curCF.addColumn(current); + } + + protected IColumn getReduced() + { + IColumn c = curCF.getAllColumns().first(); + curCF.clear(); + return c; + } + }; + + collectColumns(returnCF, reduced); + } + + public String getColumnFamilyName() + { + return RowMutation.getColumnAndColumnFamily(columnFamilyColumn)[0]); + } +} Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java?rev=793052&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java Fri Jul 10 17:48:49 2009 @@ -0,0 +1,64 @@ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.util.Comparator; + +import org.apache.commons.collections.comparators.ReverseComparator; + +import org.apache.cassandra.io.SSTableReader; +import org.apache.cassandra.utils.ReducingIterator; + +public class SliceQueryFilter extends QueryFilter +{ + public final String start, finish; + public final boolean isAscending; + public final int offset, count; + + public SliceQueryFilter(String key, String columnFamilyColumn, String start, String finish, boolean ascending, int offset, int count) + { + super(key, columnFamilyColumn); + this.start = start; + this.finish = finish; + isAscending = ascending; + this.offset = offset; + this.count = count; + } + + public ColumnIterator getMemColumnIterator(Memtable memtable) + { + return memtable.getColumnIterator(key, columnFamilyColumn, isAscending, start); + } + + public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException + { + return new SSTableColumnIterator(sstable.getFilename(), key, columnFamilyColumn, start, isAscending); + } + + @Override + protected Comparator getColumnComparator() + { + Comparator comparator = super.getColumnComparator(); + return isAscending ? comparator : new ReverseComparator(comparator); + } + + public void collectColumns(ColumnFamily returnCF, ReducingIterator reducedColumns) + { + int liveColumns = 0; + int limit = offset + count; + + for (IColumn column : reducedColumns) + { + if (liveColumns >= limit) + break; + if (!finish.isEmpty() + && ((isAscending && column.name().compareTo(finish) > 0)) + || (!isAscending && column.name().compareTo(finish) < 0)) + break; + if (!column.isMarkedForDelete()) + liveColumns++; + + if (liveColumns > offset) + returnCF.addColumn(column); + } + } +} Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=793052&r1=793051&r2=793052&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Jul 10 17:48:49 2009 @@ -592,7 +592,8 @@ long start1 = System.currentTimeMillis(); try { - ColumnFamily columnFamily = cfStore.getSliceFrom(key, cfName, start, finish, isAscending, offset, count); + QueryFilter filter = new SliceQueryFilter(key, cfName, start, finish, isAscending, offset, count); + ColumnFamily columnFamily = cfStore.getColumnFamily(filter); if (columnFamily != null) row.addColumnFamily(columnFamily); long timeTaken = System.currentTimeMillis() - start1;