cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [3/7] cassandra git commit: 9975: Flatten Iterator Transformation Hierarchy
Date Tue, 27 Oct 2015 09:57:38 GMT
9975: Flatten Iterator Transformation Hierarchy

To improve clarity of control flow, all iterator transformations
are applied via a single class that manages an explicit stack of
named transformation objects.

patch by benedict; reviewed by branimir for CASSANDRA-9975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60949747
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60949747
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60949747

Branch: refs/heads/cassandra-3.0
Commit: 609497471441273367013c09a1e0e1c990726ec7
Parents: a4f32c5
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Thu Jul 30 13:31:42 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Mon Oct 26 20:59:06 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/EmptyIterators.java | 214 ++++++++++++
 .../cassandra/db/PartitionRangeReadCommand.java |  10 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 119 +++----
 src/java/org/apache/cassandra/db/ReadQuery.java |   4 +-
 .../db/SinglePartitionNamesCommand.java         |   2 +-
 .../db/SinglePartitionSliceCommand.java         |   2 +-
 .../db/compaction/CompactionIterator.java       |  17 +-
 .../db/filter/ClusteringIndexNamesFilter.java   |  10 +-
 .../db/filter/ClusteringIndexSliceFilter.java   |  19 +-
 .../apache/cassandra/db/filter/DataLimits.java  | 185 ++++++++---
 .../apache/cassandra/db/filter/RowFilter.java   |  45 +--
 .../AlteringUnfilteredPartitionIterator.java    |  72 ----
 .../db/partitions/BasePartitionIterator.java    |  27 ++
 .../partitions/CountingPartitionIterator.java   |  58 ----
 .../db/partitions/CountingRowIterator.java      |  58 ----
 .../CountingUnfilteredPartitionIterator.java    |  52 ---
 .../CountingUnfilteredRowIterator.java          |  64 ----
 .../db/partitions/PartitionIterator.java        |   3 +-
 .../db/partitions/PartitionIterators.java       | 102 ++----
 .../cassandra/db/partitions/PurgeFunction.java  | 120 +++++++
 .../db/partitions/PurgingPartitionIterator.java | 156 ---------
 .../partitions/UnfilteredPartitionIterator.java |   6 +-
 .../UnfilteredPartitionIterators.java           | 125 ++-----
 .../partitions/WrappingPartitionIterator.java   |  50 ---
 .../WrappingUnfilteredPartitionIterator.java    | 126 -------
 .../db/rows/AlteringUnfilteredRowIterator.java  |  98 ------
 .../cassandra/db/rows/BaseRowIterator.java      |  64 ++++
 .../apache/cassandra/db/rows/RowIterator.java   |  32 +-
 .../apache/cassandra/db/rows/RowIterators.java  |  68 +---
 .../db/rows/UnfilteredRowIterator.java          |  32 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |   2 +-
 .../db/rows/UnfilteredRowIterators.java         | 215 +++---------
 .../cassandra/db/rows/WrappingRowIterator.java  |  79 -----
 .../db/rows/WrappingUnfilteredRowIterator.java  |   2 +-
 .../cassandra/db/transform/BaseIterator.java    | 129 ++++++++
 .../cassandra/db/transform/BasePartitions.java  | 100 ++++++
 .../apache/cassandra/db/transform/BaseRows.java | 139 ++++++++
 .../apache/cassandra/db/transform/Filter.java   |  56 ++++
 .../db/transform/FilteredPartitions.java        |  40 +++
 .../cassandra/db/transform/FilteredRows.java    |  40 +++
 .../cassandra/db/transform/MoreContents.java    |   8 +
 .../cassandra/db/transform/MorePartitions.java  |  35 ++
 .../apache/cassandra/db/transform/MoreRows.java |  36 ++
 .../apache/cassandra/db/transform/Stack.java    |  81 +++++
 .../db/transform/StoppingTransformation.java    |  60 ++++
 .../cassandra/db/transform/Transformation.java  | 145 +++++++++
 .../db/transform/UnfilteredPartitions.java      |  27 ++
 .../cassandra/db/transform/UnfilteredRows.java  |  40 +++
 .../cassandra/index/SecondaryIndexBuilder.java  |   9 +-
 .../internal/composites/CompositesSearcher.java |  24 +-
 .../io/sstable/ReducingKeyIterator.java         |   5 +-
 .../io/sstable/format/big/BigTableWriter.java   |  29 +-
 .../apache/cassandra/service/DataResolver.java  | 106 +++---
 .../apache/cassandra/service/StorageProxy.java  |  10 +-
 .../service/pager/AbstractQueryPager.java       | 111 +++----
 .../service/pager/MultiPartitionPager.java      |  14 +-
 .../cassandra/service/pager/QueryPager.java     |   6 +-
 .../cassandra/service/pager/QueryPagers.java    |   7 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |  26 +-
 .../cassandra/utils/CloseableIterator.java      |   3 +-
 .../org/apache/cassandra/utils/Throwables.java  |   9 +-
 .../Keyspace1-Standard1-jb-0-Summary.db         | Bin 202 -> 162 bytes
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../apache/cassandra/db/TransformerTest.java    | 325 +++++++++++++++++++
 .../apache/cassandra/repair/ValidatorTest.java  |   7 +-
 .../cassandra/service/DataResolverTest.java     |   6 +-
 67 files changed, 2201 insertions(+), 1675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0457917..12f62f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
  * Remove token generator (CASSANDRA-5261)
  * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
  * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/EmptyIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyIterators.java b/src/java/org/apache/cassandra/db/EmptyIterators.java
new file mode 100644
index 0000000..6bf8fff
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/EmptyIterators.java
@@ -0,0 +1,214 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+
+public class EmptyIterators
+{
+
+    private static class EmptyBasePartitionIterator<R extends BaseRowIterator<?>> implements BasePartitionIterator<R>
+    {
+        EmptyBasePartitionIterator()
+        {
+        }
+
+        public void close()
+        {
+        }
+
+        public boolean hasNext()
+        {
+            return false;
+        }
+
+        public R next()
+        {
+            throw new NoSuchElementException();
+        }
+    }
+
+    private static class EmptyUnfilteredPartitionIterator extends EmptyBasePartitionIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    {
+        final CFMetaData metadata;
+        final boolean isForThrift;
+
+        public EmptyUnfilteredPartitionIterator(CFMetaData metadata, boolean isForThrift)
+        {
+            this.metadata = metadata;
+            this.isForThrift = isForThrift;
+        }
+
+        public boolean isForThrift()
+        {
+            return isForThrift;
+        }
+
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+    }
+
+    private static class EmptyPartitionIterator extends EmptyBasePartitionIterator<RowIterator> implements PartitionIterator
+    {
+        public static final EmptyPartitionIterator instance = new EmptyPartitionIterator();
+        private EmptyPartitionIterator()
+        {
+            super();
+        }
+    }
+
+    private static class EmptyBaseRowIterator<U extends Unfiltered> implements BaseRowIterator<U>
+    {
+        final PartitionColumns columns;
+        final CFMetaData metadata;
+        final DecoratedKey partitionKey;
+        final boolean isReverseOrder;
+        final Row staticRow;
+
+        EmptyBaseRowIterator(PartitionColumns columns, CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow)
+        {
+            this.columns = columns;
+            this.metadata = metadata;
+            this.partitionKey = partitionKey;
+            this.isReverseOrder = isReverseOrder;
+            this.staticRow = staticRow;
+        }
+
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+
+        public boolean isReverseOrder()
+        {
+            return isReverseOrder;
+        }
+
+        public PartitionColumns columns()
+        {
+            return columns;
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return partitionKey;
+        }
+
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        public void close()
+        {
+        }
+
+        public boolean isEmpty()
+        {
+            return staticRow == Rows.EMPTY_STATIC_ROW;
+        }
+
+        public boolean hasNext()
+        {
+            return false;
+        }
+
+        public U next()
+        {
+            throw new NoSuchElementException();
+        }
+    }
+
+    private static class EmptyUnfilteredRowIterator extends EmptyBaseRowIterator<Unfiltered> implements UnfilteredRowIterator
+    {
+        final DeletionTime partitionLevelDeletion;
+        public EmptyUnfilteredRowIterator(PartitionColumns columns, CFMetaData metadata, DecoratedKey partitionKey,
+                                          boolean isReverseOrder, Row staticRow, DeletionTime partitionLevelDeletion)
+        {
+            super(columns, metadata, partitionKey, isReverseOrder, staticRow);
+            this.partitionLevelDeletion = partitionLevelDeletion;
+        }
+
+        public boolean isEmpty()
+        {
+            return partitionLevelDeletion == DeletionTime.LIVE && super.isEmpty();
+        }
+
+        public DeletionTime partitionLevelDeletion()
+        {
+            return partitionLevelDeletion;
+        }
+
+        public EncodingStats stats()
+        {
+            return EncodingStats.NO_STATS;
+        }
+    }
+
+    private static class EmptyRowIterator extends EmptyBaseRowIterator<Row> implements RowIterator
+    {
+        public EmptyRowIterator(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow)
+        {
+            super(PartitionColumns.NONE, metadata, partitionKey, isReverseOrder, staticRow);
+        }
+    }
+
+    public static UnfilteredPartitionIterator unfilteredPartition(CFMetaData metadata, boolean isForThrift)
+    {
+        return new EmptyUnfilteredPartitionIterator(metadata, isForThrift);
+    }
+
+    public static PartitionIterator partition()
+    {
+        return EmptyPartitionIterator.instance;
+    }
+
+    // this method is the only one that can return a non-empty iterator, but it still has no rows, so it seems cleanest to keep it here
+    public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow, DeletionTime partitionDeletion)
+    {
+        PartitionColumns columns = PartitionColumns.NONE;
+        if (!staticRow.isEmpty())
+            columns = new PartitionColumns(Columns.from(staticRow.columns()), Columns.NONE);
+        else
+            staticRow = Rows.EMPTY_STATIC_ROW;
+
+        if (partitionDeletion.isLive())
+            partitionDeletion = DeletionTime.LIVE;
+
+        return new EmptyUnfilteredRowIterator(columns, metadata, partitionKey, isReverseOrder, staticRow, partitionDeletion);
+    }
+
+    public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder)
+    {
+        return new EmptyUnfilteredRowIterator(PartitionColumns.NONE, metadata, partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE);
+    }
+
+    public static RowIterator row(CFMetaData metadata, DecoratedKey partitionKey, boolean isReverseOrder)
+    {
+        return new EmptyRowIterator(metadata, partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index f17f3e3..9fce15e 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -30,7 +30,8 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.index.Index;
@@ -226,10 +227,10 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
     {
-        return new WrappingUnfilteredPartitionIterator(iter)
+        class CacheFilter extends Transformation
         {
             @Override
-            public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            public BaseRowIterator applyToPartition(BaseRowIterator iter)
             {
                 // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
                 // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
@@ -249,7 +250,8 @@ public class PartitionRangeReadCommand extends ReadCommand
 
                 return iter;
             }
-        };
+        }
+        return Transformation.apply(iter, new CacheFilter());
     }
 
     public MessageOut<ReadCommand> createMessage(int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3b1f8a8..4d9b65b 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -383,7 +384,7 @@ public abstract class ReadCommand implements ReadQuery
      */
     private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
     {
-        return new WrappingUnfilteredPartitionIterator(iter)
+        class MetricRecording extends Transformation<UnfilteredRowIterator>
         {
             private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
             private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
@@ -396,78 +397,71 @@ public abstract class ReadCommand implements ReadQuery
             private DecoratedKey currentKey;
 
             @Override
-            public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
             {
                 currentKey = iter.partitionKey();
+                return Transformation.apply(iter, this);
+            }
 
-                return new AlteringUnfilteredRowIterator(iter)
-                {
-                    @Override
-                    protected Row computeNextStatic(Row row)
-                    {
-                        return computeNext(row);
-                    }
-
-                    @Override
-                    protected Row computeNext(Row row)
-                    {
-                        if (row.hasLiveData(ReadCommand.this.nowInSec()))
-                            ++liveRows;
-
-                        for (Cell cell : row.cells())
-                        {
-                            if (!cell.isLive(ReadCommand.this.nowInSec()))
-                                countTombstone(row.clustering());
-                        }
-                        return row;
-                    }
-
-                    @Override
-                    protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
-                    {
-                        countTombstone(marker.clustering());
-                        return marker;
-                    }
-
-                    private void countTombstone(ClusteringPrefix clustering)
-                    {
-                        ++tombstones;
-                        if (tombstones > failureThreshold && respectTombstoneThresholds)
-                        {
-                            String query = ReadCommand.this.toCQLString();
-                            Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
-                            throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
-                        }
-                    }
-                };
+            @Override
+            public Row applyToStatic(Row row)
+            {
+                return applyToRow(row);
             }
 
             @Override
-            public void close()
+            public Row applyToRow(Row row)
             {
-                try
+                if (row.hasLiveData(ReadCommand.this.nowInSec()))
+                    ++liveRows;
+
+                for (Cell cell : row.cells())
                 {
-                    super.close();
+                    if (!cell.isLive(ReadCommand.this.nowInSec()))
+                        countTombstone(row.clustering());
                 }
-                finally
+                return row;
+            }
+
+            @Override
+            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                countTombstone(marker.clustering());
+                return marker;
+            }
+
+            private void countTombstone(ClusteringPrefix clustering)
+            {
+                ++tombstones;
+                if (tombstones > failureThreshold && respectTombstoneThresholds)
                 {
-                    recordLatency(metric, System.nanoTime() - startTimeNanos);
+                    String query = ReadCommand.this.toCQLString();
+                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
+                }
+            }
 
-                    metric.tombstoneScannedHistogram.update(tombstones);
-                    metric.liveScannedHistogram.update(liveRows);
+            @Override
+            public void onClose()
+            {
+                recordLatency(metric, System.nanoTime() - startTimeNanos);
 
-                    boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
-                    if (warnTombstones)
-                    {
-                        String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
-                        ClientWarn.warn(msg);
-                        logger.warn(msg);
-                    }
+                metric.tombstoneScannedHistogram.update(tombstones);
+                metric.liveScannedHistogram.update(liveRows);
 
-                    Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
+                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
+                if (warnTombstones)
+                {
+                    String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
+                    ClientWarn.warn(msg);
+                    logger.warn(msg);
                 }
+
+                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
             }
         };
+
+        return Transformation.apply(iter, new MetricRecording());
     }
 
     /**
@@ -482,13 +476,20 @@ public abstract class ReadCommand implements ReadQuery
     // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
     protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
     {
-        return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+        final boolean isForThrift = iterator.isForThrift();
+        class WithoutPurgeableTombstones extends PurgeFunction
         {
+            public WithoutPurgeableTombstones()
+            {
+                super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
+            }
+
             protected long getMaxPurgeableTimestamp()
             {
                 return Long.MAX_VALUE;
             }
-        };
+        }
+        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index d1f5272..178ca7c 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -42,12 +42,12 @@ public interface ReadQuery
 
         public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
         {
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
         }
 
         public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
         {
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
         }
 
         public DataLimits limits()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index 430e4a1..763919e 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -153,7 +153,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
         cfs.metric.updateSSTableIterated(sstablesIterated);
 
         if (result == null || result.isEmpty())
-            return UnfilteredRowIterators.emptyIterator(metadata(), partitionKey(), false);
+            return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
 
         DecoratedKey key = result.partitionKey();
         cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index 27aab62..f4e7af1 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -235,7 +235,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
             cfs.metric.updateSSTableIterated(sstablesIterated);
 
             if (iterators.isEmpty())
-                return UnfilteredRowIterators.emptyIterator(cfs.metadata, partitionKey(), filter.isReversed());
+                return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
 
             Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index fe18c04..8a3b24b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -25,10 +25,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PurgingPartitionIterator;
+import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.index.transactions.CompactionTransaction;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.metrics.CompactionMetrics;
@@ -98,9 +99,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         if (metrics != null)
             metrics.beginCompaction(this);
 
-        this.compacted = scanners.isEmpty()
-                       ? UnfilteredPartitionIterators.empty(controller.cfs.metadata)
-                       : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller);
+        UnfilteredPartitionIterator merged = scanners.isEmpty()
+                                             ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
+                                             : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
+        boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
+        this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller));
     }
 
     public boolean isForThrift()
@@ -251,7 +254,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         return this.getCompactionInfo().toString();
     }
 
-    private class PurgeIterator extends PurgingPartitionIterator
+    private class Purger extends PurgeFunction
     {
         private final CompactionController controller;
 
@@ -261,9 +264,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
         private long compactedUnfiltered;
 
-        private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
+        private Purger(boolean isForThrift, CompactionController controller)
         {
-            super(toPurge, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
+            super(isForThrift, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
             this.controller = controller;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index d3a289a..388cd50 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -107,20 +108,21 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
     {
         // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
         // the range extend) and it's harmless to left them.
-        return new AlteringUnfilteredRowIterator(iterator)
+        class FilterNotIndexed extends Transformation
         {
             @Override
-            public Row computeNextStatic(Row row)
+            public Row applyToStatic(Row row)
             {
                 return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
             }
 
             @Override
-            public Row computeNext(Row row)
+            public Row applyToRow(Row row)
             {
                 return clusterings.contains(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
             }
-        };
+        }
+        return Transformation.apply(iterator, new FilterNotIndexed());
     }
 
     public UnfilteredRowIterator filter(final SliceableUnfilteredRowIterator iter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index b2d529c..7a174ee 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -91,26 +92,26 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
 
         // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
         // the range extend) and it's harmless to leave them.
-        return new AlteringUnfilteredRowIterator(iterator)
+        class FilterNotIndexed extends Transformation
         {
-            @Override
-            public boolean hasNext()
+            public boolean isDoneForPartition()
             {
-                return !tester.isDone() && super.hasNext();
+                return tester.isDone();
             }
 
             @Override
-            public Row computeNextStatic(Row row)
+            public Row applyToRow(Row row)
             {
-                return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
+                return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
             }
 
             @Override
-            public Row computeNext(Row row)
+            public Row applyToStatic(Row row)
             {
-                return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
+                return columnFilter.fetchedColumns().statics.isEmpty() ? Rows.EMPTY_STATIC_ROW : row.filter(columnFilter, iterator.metadata());
             }
-        };
+        }
+        return Transformation.apply(iterator, new FilterNotIndexed());
     }
 
     public UnfilteredRowIterator filter(SliceableUnfilteredRowIterator iterator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index d5eefe3..130c6ba 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -23,6 +23,10 @@ import java.nio.ByteBuffer;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.transform.BasePartitions;
+import org.apache.cassandra.db.transform.BaseRows;
+import org.apache.cassandra.db.transform.StoppingTransformation;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -125,17 +129,17 @@ public abstract class DataLimits
 
     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
     {
-        return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false));
+        return this.newCounter(nowInSec, false).applyTo(iter);
     }
 
     public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec)
     {
-        return new CountingUnfilteredRowIterator(iter, newCounter(nowInSec, false));
+        return this.newCounter(nowInSec, false).applyTo(iter);
     }
 
     public PartitionIterator filter(PartitionIterator iter, int nowInSec)
     {
-        return new CountingPartitionIterator(iter, this, nowInSec);
+        return this.newCounter(nowInSec, true).applyTo(iter);
     }
 
     /**
@@ -144,11 +148,36 @@ public abstract class DataLimits
      */
     public abstract float estimateTotalResults(ColumnFamilyStore cfs);
 
-    public interface Counter
+    public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
     {
-        public void newPartition(DecoratedKey partitionKey, Row staticRow);
-        public void newRow(Row row);
-        public void endOfPartition();
+        // false means we do not propagate our stop signals onto the iterator, we only count
+        private boolean enforceLimits = true;
+
+        public Counter onlyCount()
+        {
+            this.enforceLimits = false;
+            return this;
+        }
+
+        public PartitionIterator applyTo(PartitionIterator partitions)
+        {
+            return Transformation.apply(partitions, this);
+        }
+
+        public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions)
+        {
+            return Transformation.apply(partitions, this);
+        }
+
+        public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
+        {
+            return (UnfilteredRowIterator) applyToPartition(partition);
+        }
+
+        public RowIterator applyTo(RowIterator partition)
+        {
+            return (RowIterator) applyToPartition(partition);
+        }
 
         /**
          * The number of results counted.
@@ -157,12 +186,40 @@ public abstract class DataLimits
          *
          * @return the number of results counted.
          */
-        public int counted();
+        public abstract int counted();
+        public abstract int countedInCurrentPartition();
 
-        public int countedInCurrentPartition();
+        public abstract boolean isDone();
+        public abstract boolean isDoneForPartition();
 
-        public boolean isDone();
-        public boolean isDoneForPartition();
+        @Override
+        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
+        {
+            return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this)
+                                                              : Transformation.apply((RowIterator) partition, this);
+        }
+
+        // called before we process a given partition
+        protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow);
+
+        @Override
+        protected void attachTo(BasePartitions partitions)
+        {
+            if (enforceLimits)
+                super.attachTo(partitions);
+            if (isDone())
+                stop();
+        }
+
+        @Override
+        protected void attachTo(BaseRows rows)
+        {
+            if (enforceLimits)
+                super.attachTo(rows);
+            applyToPartition(rows.partitionKey(), rows.staticRow());
+            if (isDoneForPartition())
+                stopInPartition();
+        }
     }
 
     /**
@@ -241,13 +298,15 @@ public abstract class DataLimits
                 return false;
 
             // Otherwise, we need to re-count
+
+            DataLimits.Counter counter = newCounter(nowInSec, false);
             try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
-                 CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false)))
+                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
             {
                 // Consume the iterator until we've counted enough
-                while (iter.hasNext() && !iter.counter().isDone())
+                while (iter.hasNext())
                     iter.next();
-                return iter.counter().isDone();
+                return counter.isDone();
             }
         }
 
@@ -274,7 +333,7 @@ public abstract class DataLimits
             return rowsPerPartition * (cfs.estimateKeys());
         }
 
-        protected class CQLCounter implements Counter
+        protected class CQLCounter extends Counter
         {
             protected final int nowInSec;
             protected final boolean assumeLiveData;
@@ -290,23 +349,39 @@ public abstract class DataLimits
                 this.assumeLiveData = assumeLiveData;
             }
 
-            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            @Override
+            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
             {
                 rowInCurrentPartition = 0;
                 if (!staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec)))
                     hasLiveStaticRow = true;
             }
 
-            public void endOfPartition()
+            @Override
+            public Row applyToRow(Row row)
+            {
+                if (assumeLiveData || row.hasLiveData(nowInSec))
+                    incrementRowCount();
+                return row;
+            }
+
+            @Override
+            public void onPartitionClose()
             {
                 // Normally, we don't count static rows as from a CQL point of view, it will be merge with other
                 // rows in the partition. However, if we only have the static row, it will be returned as one row
                 // so count it.
                 if (hasLiveStaticRow && rowInCurrentPartition == 0)
-                {
-                    ++rowCounted;
-                    ++rowInCurrentPartition;
-                }
+                    incrementRowCount();
+                super.onPartitionClose();
+            }
+
+            private void incrementRowCount()
+            {
+                if (++rowCounted >= rowLimit)
+                    stop();
+                if (++rowInCurrentPartition >= perPartitionLimit)
+                    stopInPartition();
             }
 
             public int counted()
@@ -328,15 +403,6 @@ public abstract class DataLimits
             {
                 return isDone() || rowInCurrentPartition >= perPartitionLimit;
             }
-
-            public void newRow(Row row)
-            {
-                if (assumeLiveData || row.hasLiveData(nowInSec))
-                {
-                    ++rowCounted;
-                    ++rowInCurrentPartition;
-                }
-            }
         }
 
         @Override
@@ -402,7 +468,7 @@ public abstract class DataLimits
             }
 
             @Override
-            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
             {
                 if (partitionKey.getKey().equals(lastReturnedKey))
                 {
@@ -415,7 +481,7 @@ public abstract class DataLimits
                 }
                 else
                 {
-                    super.newPartition(partitionKey, staticRow);
+                    super.applyToPartition(partitionKey, staticRow);
                 }
             }
         }
@@ -481,13 +547,14 @@ public abstract class DataLimits
                 return false;
 
             // Otherwise, we need to re-count
+            DataLimits.Counter counter = newCounter(nowInSec, false);
             try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
-                 CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false)))
+                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
             {
                 // Consume the iterator until we've counted enough
-                while (iter.hasNext() && !iter.counter().isDone())
+                while (iter.hasNext())
                     iter.next();
-                return iter.counter().isDone();
+                return counter.isDone();
             }
         }
 
@@ -513,7 +580,7 @@ public abstract class DataLimits
             return cellsPerPartition * cfs.estimateKeys();
         }
 
-        protected class ThriftCounter implements Counter
+        protected class ThriftCounter extends Counter
         {
             protected final int nowInSec;
             protected final boolean assumeLiveData;
@@ -528,16 +595,35 @@ public abstract class DataLimits
                 this.assumeLiveData = assumeLiveData;
             }
 
-            public void newPartition(DecoratedKey partitionKey, Row staticRow)
+            @Override
+            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
             {
                 cellsInCurrentPartition = 0;
                 if (!staticRow.isEmpty())
-                    newRow(staticRow);
+                    applyToRow(staticRow);
             }
 
-            public void endOfPartition()
+            @Override
+            public Row applyToRow(Row row)
             {
-                ++partitionsCounted;
+                for (Cell cell : row.cells())
+                {
+                    if (assumeLiveData || cell.isLive(nowInSec))
+                    {
+                        ++cellsCounted;
+                        if (++cellsInCurrentPartition >= cellPerPartitionLimit)
+                            stopInPartition();
+                    }
+                }
+                return row;
+            }
+
+            @Override
+            public void onPartitionClose()
+            {
+                if (++partitionsCounted >= partitionLimit)
+                    stop();
+                super.onPartitionClose();
             }
 
             public int counted()
@@ -559,18 +645,6 @@ public abstract class DataLimits
             {
                 return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit;
             }
-
-            public void newRow(Row row)
-            {
-                for (Cell cell : row.cells())
-                {
-                    if (assumeLiveData || cell.isLive(nowInSec))
-                    {
-                        ++cellsCounted;
-                        ++cellsInCurrentPartition;
-                    }
-                }
-            }
         }
 
         @Override
@@ -625,14 +699,17 @@ public abstract class DataLimits
                 super(nowInSec, assumeLiveData);
             }
 
-            public void newRow(Row row)
+            @Override
+            public Row applyToRow(Row row)
             {
                 // In the internal format, a row == a super column, so that's what we want to count.
                 if (assumeLiveData || row.hasLiveData(nowInSec))
                 {
                     ++cellsCounted;
-                    ++cellsInCurrentPartition;
+                    if (++cellsInCurrentPartition >= cellPerPartitionLimit)
+                        stopInPartition();
                 }
+                return row;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 0ff30af..09dc342 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.base.Objects;
-import org.apache.commons.lang3.builder.ToStringBuilder;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -31,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -222,28 +222,29 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             if (expressions.isEmpty())
                 return iter;
 
-            return new AlteringUnfilteredPartitionIterator(iter)
+            class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator>
             {
-                protected Row computeNext(DecoratedKey partitionKey, Row row)
+                DecoratedKey pk;
+                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
                 {
-                    // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
-                    Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
-                    return purged != null && CQLFilter.this.isSatisfiedBy(partitionKey, purged) ? row : null;
+                    pk = partition.partitionKey();
+                    return Transformation.apply(partition, this);
                 }
-            };
-        }
 
-        /**
-         * Returns whether the provided row (with it's partition key) satisfies
-         * this row filter or not (that is, if it satisfies all of its expressions).
-         */
-        private boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
-        {
-            for (Expression e : expressions)
-                if (!e.isSatisfiedBy(partitionKey, row))
-                    return false;
+                public Row applyToRow(Row row)
+                {
+                    Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+                    if (purged == null)
+                        return null;
 
-            return true;
+                    for (Expression e : expressions)
+                        if (!e.isSatisfiedBy(pk, purged))
+                            return null;
+                    return row;
+                }
+            }
+
+            return Transformation.apply(iter, new IsSatisfiedFilter());
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)
@@ -264,16 +265,17 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             if (expressions.isEmpty())
                 return iter;
 
-            return new WrappingUnfilteredPartitionIterator(iter)
+            class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator>
             {
                 @Override
-                public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
                 {
                     // Thrift does not filter rows, it filters entire partition if any of the expression is not
                     // satisfied, which forces us to materialize the result (in theory we could materialize only
                     // what we need which might or might not be everything, but we keep it simple since in practice
                     // it's not worth that it has ever been).
                     ImmutableBTreePartition result = ImmutableBTreePartition.create(iter);
+                    iter.close();
 
                     // The partition needs to have a row for every expression, and the expression needs to be valid.
                     for (Expression expr : expressions)
@@ -286,7 +288,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                     // If we get there, it means all expressions where satisfied, so return the original result
                     return result.unfilteredIterator();
                 }
-            };
+            }
+            return Transformation.apply(iter, new IsSatisfiedThriftFilter());
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
deleted file mode 100644
index f7d7222..0000000
--- a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * A partition iterator that allows to filter/modify the unfiltered from the
- * underlying iterators.
- */
-public abstract class AlteringUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator
-{
-    protected AlteringUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped)
-    {
-        super(wrapped);
-    }
-
-    protected Row computeNextStatic(DecoratedKey partitionKey, Row row)
-    {
-        return row;
-    }
-
-    protected Row computeNext(DecoratedKey partitionKey, Row row)
-    {
-        return row;
-    }
-
-    protected RangeTombstoneMarker computeNext(DecoratedKey partitionKey, RangeTombstoneMarker marker)
-    {
-        return marker;
-    }
-
-    @Override
-    protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
-    {
-        final DecoratedKey partitionKey = iter.partitionKey();
-        return new AlteringUnfilteredRowIterator(iter)
-        {
-            protected Row computeNextStatic(Row row)
-            {
-                return AlteringUnfilteredPartitionIterator.this.computeNextStatic(partitionKey, row);
-            }
-
-            protected Row computeNext(Row row)
-            {
-                return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, row);
-            }
-
-            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
-            {
-                return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, marker);
-            }
-        };
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java
new file mode 100644
index 0000000..214f416
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/BasePartitionIterator.java
@@ -0,0 +1,27 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+public interface BasePartitionIterator<I extends BaseRowIterator<?>> extends CloseableIterator<I>
+{
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
deleted file mode 100644
index 16445e7..0000000
--- a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.DataLimits;
-
-public class CountingPartitionIterator extends WrappingPartitionIterator
-{
-    protected final DataLimits.Counter counter;
-
-    public CountingPartitionIterator(PartitionIterator result, DataLimits.Counter counter)
-    {
-        super(result);
-        this.counter = counter;
-    }
-
-    public CountingPartitionIterator(PartitionIterator result, DataLimits limits, int nowInSec)
-    {
-        this(result, limits.newCounter(nowInSec, true));
-    }
-
-    public DataLimits.Counter counter()
-    {
-        return counter;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (counter.isDone())
-            return false;
-
-        return super.hasNext();
-    }
-
-    @Override
-    @SuppressWarnings("resource") // Close through the closing of the returned 'CountingRowIterator' (and CountingRowIterator shouldn't throw)
-    public RowIterator next()
-    {
-        return new CountingRowIterator(super.next(), counter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
deleted file mode 100644
index 4ad321e..0000000
--- a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.DataLimits;
-
-public class CountingRowIterator extends WrappingRowIterator
-{
-    protected final DataLimits.Counter counter;
-
-    public CountingRowIterator(RowIterator iter, DataLimits.Counter counter)
-    {
-        super(iter);
-        this.counter = counter;
-
-        counter.newPartition(iter.partitionKey(), iter.staticRow());
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (counter.isDoneForPartition())
-            return false;
-
-        return super.hasNext();
-    }
-
-    @Override
-    public Row next()
-    {
-        Row row = super.next();
-        counter.newRow(row);
-        return row;
-    }
-
-    @Override
-    public void close()
-    {
-        super.close();
-        counter.endOfPartition();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
deleted file mode 100644
index 52eedd4..0000000
--- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.DataLimits;
-
-public class CountingUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator
-{
-    protected final DataLimits.Counter counter;
-
-    public CountingUnfilteredPartitionIterator(UnfilteredPartitionIterator result, DataLimits.Counter counter)
-    {
-        super(result);
-        this.counter = counter;
-    }
-
-    public DataLimits.Counter counter()
-    {
-        return counter;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (counter.isDone())
-            return false;
-
-        return super.hasNext();
-    }
-
-    @Override
-    public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
-    {
-        return new CountingUnfilteredRowIterator(iter, counter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
deleted file mode 100644
index e5d1e75..0000000
--- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.DataLimits;
-
-public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
-{
-    private final DataLimits.Counter counter;
-
-    public CountingUnfilteredRowIterator(UnfilteredRowIterator iter, DataLimits.Counter counter)
-    {
-        super(iter);
-        this.counter = counter;
-
-        counter.newPartition(iter.partitionKey(), iter.staticRow());
-    }
-
-    public DataLimits.Counter counter()
-    {
-        return counter;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (counter.isDoneForPartition())
-            return false;
-
-        return super.hasNext();
-    }
-
-    @Override
-    public Unfiltered next()
-    {
-        Unfiltered next = super.next();
-        if (next.isRow())
-            counter.newRow((Row)next);
-        return next;
-    }
-
-    @Override
-    public void close()
-    {
-        super.close();
-        counter.endOfPartition();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
index 36358fc..529a9e2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.*;
  * reference on the returned objects for longer than the iteration, it must
  * make a copy of it explicitely.
  */
-public interface PartitionIterator extends Iterator<RowIterator>, AutoCloseable
+public interface PartitionIterator extends BasePartitionIterator<RowIterator>
 {
-    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index eeb6a4b..0b43c19 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -20,37 +20,18 @@ package org.apache.cassandra.db.partitions;
 import java.util.*;
 import java.security.MessageDigest;
 
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.transform.MorePartitions;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
 
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.util.FileUtils;
 
 public abstract class PartitionIterators
 {
     private PartitionIterators() {}
 
-    public static final PartitionIterator EMPTY = new PartitionIterator()
-    {
-        public boolean hasNext()
-        {
-            return false;
-        }
-
-        public RowIterator next()
-        {
-            throw new NoSuchElementException();
-        }
-
-        public void remove()
-        {
-        }
-
-        public void close()
-        {
-        }
-    };
-
     @SuppressWarnings("resource") // The created resources are returned right away
     public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command)
     {
@@ -58,30 +39,24 @@ public abstract class PartitionIterators
         // want a RowIterator out of this method, so we return an empty one.
         RowIterator toReturn = iter.hasNext()
                              ? iter.next()
-                             : RowIterators.emptyIterator(command.metadata(),
-                                                          command.partitionKey(),
-                                                          command.clusteringIndexFilter().isReversed());
+                             : EmptyIterators.row(command.metadata(),
+                                                  command.partitionKey(),
+                                                  command.clusteringIndexFilter().isReversed());
 
         // Note that in general, we should wrap the result so that it's close method actually
         // close the whole PartitionIterator.
-        return new WrappingRowIterator(toReturn)
+        class Close extends Transformation
         {
-            public void close()
+            public void onPartitionClose()
             {
-                try
-                {
-                    super.close();
-                }
-                finally
-                {
-                    // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used
-                    // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed.
-                    assert !iter.hasNext();
-
-                    iter.close();
-                }
+                // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used
+                // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed.
+                boolean hadNext = iter.hasNext();
+                iter.close();
+                assert !hadNext;
             }
-        };
+        }
+        return Transformation.apply(toReturn, new Close());
     }
 
     @SuppressWarnings("resource") // The created resources are returned right away
@@ -90,39 +65,17 @@ public abstract class PartitionIterators
         if (iterators.size() == 1)
             return iterators.get(0);
 
-        return new PartitionIterator()
+        class Extend implements MorePartitions<PartitionIterator>
         {
-            private int idx = 0;
-
-            public boolean hasNext()
+            int i = 1;
+            public PartitionIterator moreContents()
             {
-                while (idx < iterators.size())
-                {
-                    if (iterators.get(idx).hasNext())
-                        return true;
-
-                    ++idx;
-                }
-                return false;
+                if (i >= iterators.size())
+                    return null;
+                return iterators.get(i++);
             }
-
-            public RowIterator next()
-            {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-                return iterators.get(idx).next();
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
-            {
-                FileUtils.closeQuietly(iterators);
-            }
-        };
+        }
+        return MorePartitions.extend(iterators.get(0), new Extend());
     }
 
     public static void digest(PartitionIterator iterator, MessageDigest digest)
@@ -162,13 +115,14 @@ public abstract class PartitionIterators
     @SuppressWarnings("resource") // The created resources are returned right away
     public static PartitionIterator loggingIterator(PartitionIterator iterator, final String id)
     {
-        return new WrappingPartitionIterator(iterator)
+        class Logger extends Transformation<RowIterator>
         {
-            public RowIterator next()
+            public RowIterator applyToPartition(RowIterator partition)
             {
-                return RowIterators.loggingIterator(super.next(), id);
+                return RowIterators.loggingIterator(partition, id);
             }
-        };
+        }
+        return Transformation.apply(iterator, new Logger());
     }
 
     private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
new file mode 100644
index 0000000..b7b01d6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+
+public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
+{
+    private final boolean isForThrift;
+    private final DeletionPurger purger;
+    private final int gcBefore;
+    private boolean isReverseOrder;
+
+    public PurgeFunction(boolean isForThrift, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
+    {
+        this.isForThrift = isForThrift;
+        this.gcBefore = gcBefore;
+        this.purger = (timestamp, localDeletionTime) ->
+                      !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
+                      && localDeletionTime < gcBefore
+                      && timestamp < getMaxPurgeableTimestamp();
+    }
+
+    protected abstract long getMaxPurgeableTimestamp();
+
+    // Called at the beginning of each new partition
+    protected void onNewPartition(DecoratedKey partitionKey)
+    {
+    }
+
+    // Called for each partition that had only purged infos and are empty post-purge.
+    protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
+    {
+    }
+
+    // Called for every unfiltered. Meant for CompactionIterator to update progress
+    protected void updateProgress()
+    {
+    }
+
+    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+    {
+        onNewPartition(partition.partitionKey());
+
+        isReverseOrder = partition.isReverseOrder();
+        UnfilteredRowIterator purged = Transformation.apply(partition, this);
+        if (!isForThrift && purged.isEmpty())
+        {
+            onEmptyPartitionPostPurge(purged.partitionKey());
+            purged.close();
+            return null;
+        }
+
+        return purged;
+    }
+
+    public DeletionTime applyToDeletion(DeletionTime deletionTime)
+    {
+        return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime;
+    }
+
+    public Row applyToStatic(Row row)
+    {
+        updateProgress();
+        return row.purge(purger, gcBefore);
+    }
+
+    public Row applyToRow(Row row)
+    {
+        updateProgress();
+        return row.purge(purger, gcBefore);
+    }
+
+    public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+    {
+        updateProgress();
+        boolean reversed = isReverseOrder;
+        if (marker.isBoundary())
+        {
+            // We can only skip the whole marker if both deletion time are purgeable.
+            // If only one of them is, filterTombstoneMarker will deal with it.
+            RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+            boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed));
+            boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed));
+
+            if (shouldPurgeClose)
+            {
+                if (shouldPurgeOpen)
+                    return null;
+
+                return boundary.createCorrespondingOpenMarker(reversed);
+            }
+
+            return shouldPurgeOpen
+                   ? boundary.createCorrespondingCloseMarker(reversed)
+                   : marker;
+        }
+        else
+        {
+            return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
deleted file mode 100644
index 2093f53..0000000
--- a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartitionIterator
-{
-    private final DeletionPurger purger;
-    private final int gcBefore;
-
-    private UnfilteredRowIterator next;
-
-    public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
-    {
-        super(iterator);
-        this.gcBefore = gcBefore;
-        this.purger = new DeletionPurger()
-        {
-            public boolean shouldPurge(long timestamp, int localDeletionTime)
-            {
-                if (onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
-                    return false;
-
-                return timestamp < getMaxPurgeableTimestamp() && localDeletionTime < gcBefore;
-            }
-        };
-    }
-
-    protected abstract long getMaxPurgeableTimestamp();
-
-    // Called at the beginning of each new partition
-    protected void onNewPartition(DecoratedKey partitionKey)
-    {
-    }
-
-    // Called for each partition that had only purged infos and are empty post-purge.
-    protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
-    {
-    }
-
-    // Called for every unfiltered. Meant for CompactionIterator to update progress
-    protected void updateProgress()
-    {
-    }
-
-    @Override
-    @SuppressWarnings("resource") // 'purged' closes wrapped 'iterator'
-    public boolean hasNext()
-    {
-        while (next == null && super.hasNext())
-        {
-            UnfilteredRowIterator iterator = super.next();
-            onNewPartition(iterator.partitionKey());
-
-            UnfilteredRowIterator purged = purge(iterator);
-            if (isForThrift() || !purged.isEmpty())
-            {
-                next = purged;
-                return true;
-            }
-
-            onEmptyPartitionPostPurge(purged.partitionKey());
-            purged.close();
-        }
-        return next != null;
-    }
-
-    @Override
-    public UnfilteredRowIterator next()
-    {
-        UnfilteredRowIterator toReturn = next;
-        next = null;
-        updateProgress();
-        return toReturn;
-    }
-
-    private UnfilteredRowIterator purge(final UnfilteredRowIterator iter)
-    {
-        return new AlteringUnfilteredRowIterator(iter)
-        {
-            @Override
-            public DeletionTime partitionLevelDeletion()
-            {
-                DeletionTime dt = iter.partitionLevelDeletion();
-                return purger.shouldPurge(dt) ? DeletionTime.LIVE : dt;
-            }
-
-            @Override
-            public Row computeNextStatic(Row row)
-            {
-                return row.purge(purger, gcBefore);
-            }
-
-            @Override
-            public Row computeNext(Row row)
-            {
-                return row.purge(purger, gcBefore);
-            }
-
-            @Override
-            public RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
-            {
-                boolean reversed = isReverseOrder();
-                if (marker.isBoundary())
-                {
-                    // We can only skip the whole marker if both deletion time are purgeable.
-                    // If only one of them is, filterTombstoneMarker will deal with it.
-                    RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
-                    boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed));
-                    boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed));
-
-                    if (shouldPurgeClose)
-                    {
-                        if (shouldPurgeOpen)
-                            return null;
-
-                        return boundary.createCorrespondingOpenMarker(reversed);
-                    }
-
-                    return shouldPurgeOpen
-                         ? boundary.createCorrespondingCloseMarker(reversed)
-                         : marker;
-                }
-                else
-                {
-                    return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker;
-                }
-            }
-
-            @Override
-            public Unfiltered next()
-            {
-                Unfiltered next = super.next();
-                updateProgress();
-                return next;
-            }
-        };
-    }
-};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 10989df..201c934 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.util.Iterator;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
@@ -30,7 +28,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  * reference on the returned objects for longer than the iteration, it must
  * make a copy of it explicitely.
  */
-public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowIterator>, AutoCloseable
+public interface UnfilteredPartitionIterator extends BasePartitionIterator<UnfilteredRowIterator>
 {
     /**
      * Whether that partition iterator is for a thrift queries.
@@ -44,6 +42,4 @@ public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowItera
     public boolean isForThrift();
 
     public CFMetaData metadata();
-
-    public void close();
 }


Mime
View raw message