cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/7] cassandra git commit: 9975: Flatten Iterator Transformation Hierarchy
Date Tue, 27 Oct 2015 09:57:36 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 a4f32c5af -> 609497471
  refs/heads/trunk 73c48260d -> 928e4c28c


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index f24c29f..2de02f6 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -30,6 +31,8 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -67,7 +70,7 @@ public class DataResolver extends ResponseResolver
         // Even though every responses should honor the limit, we might have more than requested
post reconciliation,
         // so ensure we're respecting the limit.
         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true);
-        return new CountingPartitionIterator(mergeWithShortReadProtection(iters, sources,
counter), counter);
+        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
     }
 
     private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator>
results, InetAddress[] sources, DataLimits.Counter resultCounter)
@@ -80,11 +83,11 @@ public class DataResolver extends ResponseResolver
 
         // So-called "short reads" stems from nodes returning only a subset of the results
they have for a partition due to the limit,
         // but that subset not being enough post-reconciliation. So if we don't have limit,
don't bother.
-        if (command.limits().isUnlimited())
-            return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(),
listener);
-
-        for (int i = 0; i < results.size(); i++)
-            results.set(i, new ShortReadProtectedIterator(sources[i], results.get(i), resultCounter));
+        if (!command.limits().isUnlimited())
+        {
+            for (int i = 0; i < results.size(); i++)
+                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i],
resultCounter)));
+        }
 
         return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
     }
@@ -281,78 +284,53 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private class ShortReadProtectedIterator extends CountingUnfilteredPartitionIterator
+    private class ShortReadProtection extends Transformation<UnfilteredRowIterator>
     {
         private final InetAddress source;
+        private final DataLimits.Counter counter;
         private final DataLimits.Counter postReconciliationCounter;
 
-        private ShortReadProtectedIterator(InetAddress source, UnfilteredPartitionIterator
iterator, DataLimits.Counter postReconciliationCounter)
+        private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter)
         {
-            super(iterator, command.limits().newCounter(command.nowInSec(), false));
             this.source = source;
+            this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount();
             this.postReconciliationCounter = postReconciliationCounter;
         }
 
         @Override
-        public UnfilteredRowIterator next()
+        public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
         {
-            return new ShortReadProtectedRowIterator(super.next());
+            partition = Transformation.apply(partition, counter);
+            // must apply and extend with same protection instance
+            ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(),
partition.partitionKey());
+            partition = MoreRows.extend(partition, protection);
+            partition = Transformation.apply(partition, protection); // apply after, so it
is retained when we extend (in case we need to reextend)
+            return partition;
         }
 
-        private class ShortReadProtectedRowIterator extends WrappingUnfilteredRowIterator
+        private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
         {
-            private boolean initialReadIsDone;
-            private UnfilteredRowIterator shortReadContinuation;
-            private Clustering lastClustering;
+            final CFMetaData metadata;
+            final DecoratedKey partitionKey;
+            Clustering lastClustering;
+            int lastCount = 0;
 
-            ShortReadProtectedRowIterator(UnfilteredRowIterator iter)
+            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey)
             {
-                super(iter);
-            }
-
-            @Override
-            public boolean hasNext()
-            {
-                if (super.hasNext())
-                    return true;
-
-                initialReadIsDone = true;
-
-                if (shortReadContinuation != null && shortReadContinuation.hasNext())
-                    return true;
-
-                return checkForShortRead();
+                this.metadata = metadata;
+                this.partitionKey = partitionKey;
             }
 
             @Override
-            public Unfiltered next()
+            public Row applyToRow(Row row)
             {
-                Unfiltered next = initialReadIsDone ? shortReadContinuation.next() : super.next();
-
-                if (next.kind() == Unfiltered.Kind.ROW)
-                    lastClustering = ((Row)next).clustering();
-
-                return next;
+                lastClustering = row.clustering();
+                return row;
             }
 
             @Override
-            public void close()
+            public UnfilteredRowIterator moreContents()
             {
-                try
-                {
-                    super.close();
-                }
-                finally
-                {
-                    if (shortReadContinuation != null)
-                        shortReadContinuation.close();
-                }
-            }
-
-            private boolean checkForShortRead()
-            {
-                assert shortReadContinuation == null || !shortReadContinuation.hasNext();
-
                 // We have a short read if the node this is the result of has returned the
requested number of
                 // rows for that partition (i.e. it has stopped returning results due to
the limit), but some of
                 // those results haven't made it in the final result post-reconciliation
due to other nodes
@@ -363,8 +341,9 @@ public class DataResolver extends ResponseResolver
                 // Also note that we only get here once all the results for this node have
been returned, and so
                 // if the node had returned the requested number but we still get there,
it imply some results were
                 // skipped during reconciliation.
-                if (!counter.isDoneForPartition())
-                    return false;
+                if (lastCount == counter.counted() || !counter.isDoneForPartition())
+                    return null;
+                lastCount = counter.counted();
 
                 assert !postReconciliationCounter.isDoneForPartition();
 
@@ -378,23 +357,20 @@ public class DataResolver extends ResponseResolver
                 // counting iterator.
                 int n = postReconciliationCounter.countedInCurrentPartition();
                 int x = counter.countedInCurrentPartition();
-                int toQuery = x == 0
-                              ? n * 2     // We didn't got any answer, so (somewhat randomly)
ask for twice as much
-                              : Math.max(((n * n) / x) - n, 1);
+                int toQuery = Math.max(((n * n) / x) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
-                ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey());
-                ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata().comparator,
lastClustering, false);
+                ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
+                ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator,
lastClustering, false);
                 SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(command.metadata(),
                                                                                       command.nowInSec(),
                                                                                       command.columnFilter(),
                                                                                       command.rowFilter(),
                                                                                       retryLimits,
-                                                                                      partitionKey(),
+                                                                                      partitionKey,
                                                                                       retryFilter);
 
-                shortReadContinuation = doShortReadRetry(cmd);
-                return shortReadContinuation.hasNext();
+                return doShortReadRetry(cmd);
             }
 
             private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand<?>
retryCommand)
@@ -402,7 +378,7 @@ public class DataResolver extends ResponseResolver
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE,
1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand,
Collections.singletonList(source));
                 if (StorageProxy.canDoLocalRequest(source))
-                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
+                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
                 else
                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
source, handler);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f210951..d7d6c63 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1898,7 +1898,8 @@ public class StorageProxy implements StorageProxyMBean
         private final ConsistencyLevel consistency;
 
         private final long startTime;
-        private CountingPartitionIterator sentQueryIterator;
+        private DataLimits.Counter counter;
+        private PartitionIterator sentQueryIterator;
 
         private int concurrencyFactor;
         // The two following "metric" are maintained to improve the concurrencyFactor
@@ -1928,7 +1929,7 @@ public class StorageProxy implements StorageProxyMBean
                 // else, sends the next batch of concurrent queries (after having close the
previous iterator)
                 if (sentQueryIterator != null)
                 {
-                    liveReturned += sentQueryIterator.counter().counted();
+                    liveReturned += counter.counted();
                     sentQueryIterator.close();
 
                     // It's not the first batch of queries and we're not done, so we we can
use what has been
@@ -1989,7 +1990,7 @@ public class StorageProxy implements StorageProxyMBean
             return new SingleRangeResponse(handler);
         }
 
-        private CountingPartitionIterator sendNextRequests()
+        private PartitionIterator sendNextRequests()
         {
             List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
             for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
@@ -2001,7 +2002,8 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
             // We want to count the results for the sake of updating the concurrency factor
(see updateConcurrencyFactor) but we don't want to
             // enforce any particular limit at this point (this could break code than rely
on postReconciliationProcessing), hence the DataLimits.NONE.
-            return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries),
DataLimits.NONE, command.nowInSec());
+            counter = DataLimits.NONE.newCounter(command.nowInSec(), true);
+            return counter.applyTo(PartitionIterators.concat(concurrentQueries));
         }
 
         public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index bdebd43..2599b8d 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,12 +17,11 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.NoSuchElementException;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
@@ -61,87 +60,65 @@ abstract class AbstractQueryPager implements QueryPager
     public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState
clientState) throws RequestValidationException, RequestExecutionException
     {
         if (isExhausted())
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState),
limits.forPaging(pageSize), command.nowInSec());
+        Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState),
pager);
     }
 
     public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws
RequestValidationException, RequestExecutionException
     {
         if (isExhausted())
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup),
limits.forPaging(pageSize), command.nowInSec());
+        Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+        return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup),
pager);
     }
 
-    private class PagerIterator extends CountingPartitionIterator
+    private class Pager extends Transformation<RowIterator>
     {
         private final DataLimits pageLimits;
-
+        private final DataLimits.Counter counter;
         private Row lastRow;
-
         private boolean isFirstPartition = true;
-        private RowIterator nextPartition;
 
-        private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec)
+        private Pager(DataLimits pageLimits, int nowInSec)
         {
-            super(iter, pageLimits, nowInSec);
+            this.counter = pageLimits.newCounter(nowInSec, true);
             this.pageLimits = pageLimits;
         }
 
         @Override
-        @SuppressWarnings("resource") // iter is closed by closing the result or in close()
-        public boolean hasNext()
+        public RowIterator applyToPartition(RowIterator partition)
         {
-            while (nextPartition == null && super.hasNext())
+            DecoratedKey key = partition.partitionKey();
+            if (lastKey == null || !lastKey.equals(key))
+                remainingInPartition = limits.perPartitionCount();
+            lastKey = key;
+
+            // If this is the first partition of this page, this could be the continuation
of a partition we've started
+            // on the previous page. In which case, we could have the problem that the partition
has no more "regular"
+            // rows (but the page size is such we didn't knew before) but it does has a static
row. We should then skip
+            // the partition as returning it would means to the upper layer that the partition
has "only" static columns,
+            // which is not the case (and we know the static results have been sent on the
previous page).
+            if (isFirstPartition)
             {
-                if (nextPartition == null)
-                    nextPartition = super.next();
-
-                DecoratedKey key = nextPartition.partitionKey();
-                if (lastKey == null || !lastKey.equals(key))
-                    remainingInPartition = limits.perPartitionCount();
-
-                lastKey = key;
-
-                // If this is the first partition of this page, this could be the continuation
of a partition we've started
-                // on the previous page. In which case, we could have the problem that the
partition has no more "regular"
-                // rows (but the page size is such we didn't knew before) but it does has
a static row. We should then skip
-                // the partition as returning it would means to the upper layer that the
partition has "only" static columns,
-                // which is not the case (and we know the static results have been sent on
the previous page).
-                if (isFirstPartition && isPreviouslyReturnedPartition(key) &&
!nextPartition.hasNext())
+                isFirstPartition = false;
+                if (isPreviouslyReturnedPartition(key) && !partition.hasNext())
                 {
-                    nextPartition.close();
-                    nextPartition = null;
+                    partition.close();
+                    return null;
                 }
-
-                isFirstPartition = false;
             }
-            return nextPartition != null;
-        }
-
-        @Override
-        @SuppressWarnings("resource") // iter is closed by closing the result
-        public RowIterator next()
-        {
-            if (!hasNext())
-                throw new NoSuchElementException();
 
-            RowIterator toReturn = nextPartition;
-            nextPartition = null;
-
-            return new RowPagerIterator(toReturn);
+            return Transformation.apply(counter.applyTo(partition), this);
         }
 
         @Override
-        public void close()
+        public void onClose()
         {
-            super.close();
-            if (nextPartition != null)
-                nextPartition.close();
-
             recordLast(lastKey, lastRow);
 
             int counted = counter.counted();
@@ -159,28 +136,18 @@ abstract class AbstractQueryPager implements QueryPager
             exhausted = counted < pageLimits.count();
         }
 
-        private class RowPagerIterator extends WrappingRowIterator
+        public Row applyToStatic(Row row)
         {
-            RowPagerIterator(RowIterator iter)
-            {
-                super(iter);
-            }
-
-            @Override
-            public Row staticRow()
-            {
-                Row staticRow = super.staticRow();
-                if (!staticRow.isEmpty())
-                    lastRow = staticRow;
-                return staticRow;
-            }
+            if (!row.isEmpty())
+                lastRow = row;
+            return row;
+        }
 
-            @Override
-            public Row next()
-            {
-                lastRow = super.next();
-                return lastRow;
-            }
+        @Override
+        public Row applyToRow(Row row)
+        {
+            lastRow = row;
+            return row;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 2a214a0..8caa14d 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.List;
-
 import org.apache.cassandra.utils.AbstractIterator;
 
 import org.apache.cassandra.db.*;
@@ -125,9 +123,9 @@ public class MultiPartitionPager implements QueryPager
     {
         int toQuery = Math.min(remaining, pageSize);
         PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
-        CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery),
nowInSec);
-        iter.setCounter(countingIter.counter());
-        return countingIter;
+        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
+        iter.setCounter(counter);
+        return counter.applyTo(iter);
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
@@ -135,9 +133,9 @@ public class MultiPartitionPager implements QueryPager
     {
         int toQuery = Math.min(remaining, pageSize);
         PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup);
-        CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery),
nowInSec);
-        iter.setCounter(countingIter.counter());
-        return countingIter;
+        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
+        iter.setCounter(counter);
+        return counter.applyTo(iter);
     }
 
     private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index a69335d..cdf2b97 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -18,9 +18,9 @@
 package org.apache.cassandra.service.pager;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.EmptyIterators;
 import org.apache.cassandra.db.ReadOrderGroup;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
@@ -55,12 +55,12 @@ public interface QueryPager
 
         public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState
clientState) throws RequestValidationException, RequestExecutionException
         {
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
         }
 
         public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup)
throws RequestValidationException, RequestExecutionException
         {
-            return PartitionIterators.EMPTY;
+            return EmptyIterators.partition();
         }
 
         public boolean isExhausted()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index eee94e6..02b5de2 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -53,10 +53,11 @@ public class QueryPagers
         int count = 0;
         while (!pager.isExhausted())
         {
-            try (CountingPartitionIterator iter = new CountingPartitionIterator(pager.fetchPage(pageSize,
consistencyLevel, state), limits, nowInSec))
+            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
             {
-                PartitionIterators.consume(iter);
-                count += iter.counter().counted();
+                DataLimits.Counter counter = limits.newCounter(nowInSec, true);
+                PartitionIterators.consume(counter.applyTo(iter));
+                count += counter.counted();
             }
         }
         return count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index 72e4399..14c0dca 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -59,13 +60,12 @@ import org.apache.cassandra.db.partitions.*;
  *                 "c5": { value : 4 }
  *                 "c7": { value : 1 }
  */
-public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
+public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator>
 {
     private final int nowInSec;
 
-    private ThriftResultsMerger(UnfilteredPartitionIterator wrapped, int nowInSec)
+    private ThriftResultsMerger(int nowInSec)
     {
-        super(wrapped);
         this.nowInSec = nowInSec;
     }
 
@@ -74,7 +74,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
         if (!metadata.isStaticCompactTable() && !metadata.isSuper())
             return iterator;
 
-        return new ThriftResultsMerger(iterator, nowInSec);
+        return Transformation.apply(iterator, new ThriftResultsMerger(nowInSec));
     }
 
     public static UnfilteredRowIterator maybeWrap(UnfilteredRowIterator iterator, int nowInSec)
@@ -83,14 +83,15 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             return iterator;
 
         return iterator.metadata().isSuper()
-             ? new SuperColumnsPartitionMerger(iterator, nowInSec)
+             ? Transformation.apply(iterator, new SuperColumnsPartitionMerger(iterator, nowInSec))
              : new PartitionMerger(iterator, nowInSec);
     }
 
-    protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+    @Override
+    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
     {
         return iter.metadata().isSuper()
-             ? new SuperColumnsPartitionMerger(iter, nowInSec)
+             ? Transformation.apply(iter, new SuperColumnsPartitionMerger(iter, nowInSec))
              : new PartitionMerger(iter, nowInSec);
     }
 
@@ -204,20 +205,19 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
         }
     }
 
-    private static class SuperColumnsPartitionMerger extends AlteringUnfilteredRowIterator
+    private static class SuperColumnsPartitionMerger extends Transformation
     {
         private final int nowInSec;
         private final Row.Builder builder;
         private final ColumnDefinition superColumnMapColumn;
         private final AbstractType<?> columnComparator;
 
-        private SuperColumnsPartitionMerger(UnfilteredRowIterator results, int nowInSec)
+        private SuperColumnsPartitionMerger(UnfilteredRowIterator applyTo, int nowInSec)
         {
-            super(results);
-            assert results.metadata().isSuper();
+            assert applyTo.metadata().isSuper();
             this.nowInSec = nowInSec;
 
-            this.superColumnMapColumn = results.metadata().compactValueColumn();
+            this.superColumnMapColumn = applyTo.metadata().compactValueColumn();
             assert superColumnMapColumn != null && superColumnMapColumn.type instanceof
MapType;
 
             this.builder = BTreeRow.sortedBuilder();
@@ -225,7 +225,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
         }
 
         @Override
-        protected Row computeNext(Row row)
+        public Row applyToRow(Row row)
         {
             PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row));
             if (!staticCells.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 7474f3d..a7c4300 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.util.Iterator;
 
 // so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable
 {
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 923b723..8ef6a63 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -76,13 +76,18 @@ public final class Throwables
     @SafeVarargs
     public static <E extends Exception> void perform(DiscreteAction<? extends E>
... actions) throws E
     {
-        perform(Arrays.stream(actions));
+        perform(Stream.of(actions));
+    }
+
+    public static <E extends Exception> void perform(Stream<? extends DiscreteAction<?
extends E>> stream, DiscreteAction<? extends E> ... extra) throws E
+    {
+        perform(Stream.concat(stream, Stream.of(extra)));
     }
 
     @SuppressWarnings("unchecked")
     public static <E extends Exception> void perform(Stream<DiscreteAction<?
extends E>> actions) throws E
     {
-        Throwable fail = perform(null, actions);
+        Throwable fail = perform((Throwable) null, actions);
         if (failIfCanCast(fail, null))
             throw (E) fail;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db b/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db
index 83c68ce..1fbe040 100644
Binary files a/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db
and b/test/data/legacy-sstables/jb/Keyspace1/Keyspace1-Standard1-jb-0-Summary.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 5e19d5e..ea0bd9b 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -103,9 +103,9 @@ public class Util
         return row.getCell(column);
     }
 
-    public static ClusteringPrefix clustering(ClusteringComparator comparator, Object...
o)
+    public static Clustering clustering(ClusteringComparator comparator, Object... o)
     {
-        return comparator.make(o).clustering();
+        return comparator.make(o);
     }
 
     public static Token token(String key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/db/TransformerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TransformerTest.java b/test/unit/org/apache/cassandra/db/TransformerTest.java
new file mode 100644
index 0000000..d56d8cd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/TransformerTest.java
@@ -0,0 +1,325 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TransformerTest
+{
+
+    static final CFMetaData metadata = metadata();
+    static final DecoratedKey partitionKey = new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0L),
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    static final Row staticRow = BTreeRow.singleCellRow(Clustering.STATIC_CLUSTERING, new
BufferCell(metadata.partitionColumns().columns(true).getSimple(0), 0L, 0, 0, ByteBufferUtil.bytes(-1),
null));
+
+    static CFMetaData metadata()
+    {
+        CFMetaData.Builder builder = CFMetaData.Builder.create("", "");
+        builder.addPartitionKey("pk", BytesType.instance);
+        builder.addClusteringColumn("c", Int32Type.instance);
+        builder.addStaticColumn("s", Int32Type.instance);
+        builder.addRegularColumn("v", Int32Type.instance);
+        return builder.build();
+    }
+
+    // Mock Data
+
+    static abstract class AbstractBaseRowIterator<U extends Unfiltered> extends AbstractIterator<U>
implements BaseRowIterator<U>
+    {
+        private final int i;
+        private boolean returned;
+
+        protected AbstractBaseRowIterator(int i)
+        {
+            this.i = i;
+        }
+
+        protected U computeNext()
+        {
+            if (returned)
+                return endOfData();
+            returned = true;
+            return (U) row(i);
+        }
+
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+
+        public boolean isReverseOrder()
+        {
+            return false;
+        }
+
+        public PartitionColumns columns()
+        {
+            return metadata.partitionColumns();
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return partitionKey;
+        }
+
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        public boolean isEmpty()
+        {
+            return false;
+        }
+
+        public void close()
+        {
+        }
+    }
+
+    private static UnfilteredRowIterator unfiltered(int i)
+    {
+        class Iter extends AbstractBaseRowIterator<Unfiltered> implements UnfilteredRowIterator
+        {
+            protected Iter(int i)
+            {
+                super(i);
+            }
+
+            public DeletionTime partitionLevelDeletion()
+            {
+                return DeletionTime.LIVE;
+            }
+
+            public EncodingStats stats()
+            {
+                return EncodingStats.NO_STATS;
+            }
+        }
+        return new Iter(i);
+    }
+
+    private static RowIterator filtered(int i)
+    {
+        class Iter extends AbstractBaseRowIterator<Row> implements RowIterator
+        {
+            protected Iter(int i)
+            {
+                super(i);
+            }
+        }
+        return new Iter(i);
+    }
+
+    private static Row row(int i)
+    {
+        return BTreeRow.singleCellRow(Util.clustering(metadata.comparator, i),
+                                      new BufferCell(metadata.partitionColumns().columns(false).getSimple(0),
1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, ByteBufferUtil.bytes(i), null));
+    }
+
+    // Transformations that check mock data ranges
+
+    private static Transformation expect(int from, int to, List<Check> checks)
+    {
+        Expect expect = new Expect(from, to);
+        checks.add(expect);
+        return expect;
+    }
+
+    abstract static class Check extends Transformation
+    {
+        public abstract void check();
+    }
+
+    static class Expect extends Check
+    {
+        final int from, to;
+        int cur;
+        boolean closed;
+
+        Expect(int from, int to)
+        {
+            this.from = from;
+            this.to = to;
+            this.cur = from;
+        }
+
+        public Row applyToRow(Row row)
+        {
+            Assert.assertEquals(cur++, ByteBufferUtil.toInt(row.clustering().get(0)));
+            return row;
+        }
+
+        public void onPartitionClose()
+        {
+            Assert.assertEquals(to, cur);
+            closed = true;
+        }
+
+        public void check()
+        {
+            Assert.assertTrue(closed);
+        }
+    }
+
+    // Combinations of mock data and checks for an empty, singleton, and extending (sequential)
range
+
+    private static enum Filter
+    {
+        INIT, APPLY_INNER, APPLY_OUTER, NONE
+    }
+
+    private static BaseRowIterator<?> empty(Filter filter, List<Check> checks)
+    {
+        switch (filter)
+        {
+            case INIT:
+                return Transformation.apply(EmptyIterators.row(metadata, partitionKey, false),
expect(0, 0, checks));
+            case APPLY_INNER:
+                return Transformation.apply(FilteredRows.filter(Transformation.apply(EmptyIterators.unfilteredRow(metadata,
partitionKey, false), expect(0, 0, checks)), Integer.MAX_VALUE), expect(0, 0, checks));
+            case APPLY_OUTER:
+            case NONE:
+                return Transformation.apply(EmptyIterators.unfilteredRow(metadata, partitionKey,
false), expect(0, 0, checks));
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static BaseRowIterator<?> singleton(Filter filter, int i, List<Check>
checks)
+    {
+        switch (filter)
+        {
+            case INIT:
+                return Transformation.apply(filtered(i), expect(i, i + 1, checks));
+            case APPLY_INNER:
+                return FilteredRows.filter(Transformation.apply(unfiltered(i), expect(i,
i + 1, checks)), Integer.MAX_VALUE);
+            case APPLY_OUTER:
+            case NONE:
+                return Transformation.apply(unfiltered(i), expect(i, i + 1, checks));
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static BaseRowIterator<?> extendingIterator(int count, Filter filter, List<Check>
checks)
+    {
+        class RefillNested extends Expect implements MoreRows<BaseRowIterator<?>>
+        {
+            boolean returnedEmpty, returnedSingleton, returnedNested;
+            RefillNested(int from)
+            {
+                super(from, count);
+            }
+
+            public BaseRowIterator<?> moreContents()
+            {
+                // first call return an empty iterator,
+                // second call return a singleton iterator (with a function that expects
to be around to receive just that item)
+                // third call return a nested version of ourselves, with a function that
expects to receive all future values
+                // fourth call, return null, indicating no more iterators to return
+
+                if (!returnedEmpty)
+                {
+                    returnedEmpty = true;
+                    return empty(filter, checks);
+                }
+
+                if (!returnedSingleton)
+                {
+                    returnedSingleton = true;
+                    return singleton(filter, from, checks);
+                }
+
+                if (from + 1 >= to)
+                    return null;
+
+                if (!returnedNested)
+                {
+                    returnedNested = true;
+
+                    RefillNested refill = new RefillNested(from + 1);
+                    checks.add(refill);
+                    return refill.applyTo(empty(filter, checks));
+                }
+
+                return null;
+            }
+
+            BaseRowIterator<?> applyTo(BaseRowIterator<?> iter)
+            {
+                if (iter instanceof UnfilteredRowIterator)
+                    return Transformation.apply(MoreRows.extend((UnfilteredRowIterator) iter,
this), this);
+                else
+                    return Transformation.apply(MoreRows.extend((RowIterator) iter, this),
this);
+            }
+        }
+
+        RefillNested refill = new RefillNested(0);
+        checks.add(refill);
+
+        BaseRowIterator<?> iter = empty(filter, checks);
+        switch (filter)
+        {
+            case APPLY_OUTER:
+                return FilteredRows.filter((UnfilteredRowIterator) refill.applyTo(iter),
Integer.MAX_VALUE);
+            case APPLY_INNER:
+            case INIT:
+            case NONE:
+                return refill.applyTo(iter);
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    @Test
+    public void testRowExtension()
+    {
+        for (Filter filter : Filter.values())
+        {
+            List<Check> checks = new ArrayList<>();
+
+            BaseRowIterator<?> iter = extendingIterator(5, filter, checks);
+            for (int i = 0 ; i < 5 ; i++)
+            {
+                Unfiltered u = iter.next();
+                assert u instanceof Row;
+                Assert.assertEquals(i, ByteBufferUtil.toInt(u.clustering().get(0)));
+            }
+            iter.close();
+
+            for (Check check : checks)
+                check.check();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 8fe76c3..14f5707 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
 import org.junit.After;
@@ -32,8 +30,8 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.EmptyIterators;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -45,7 +43,6 @@ import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
@@ -126,7 +123,7 @@ public class ValidatorTest
 
         // add a row
         Token mid = partitioner.midpoint(range.left, range.right);
-        validator.add(UnfilteredRowIterators.emptyIterator(cfs.metadata, new BufferDecoratedKey(mid,
ByteBufferUtil.bytes("inconceivable!")), false));
+        validator.add(EmptyIterators.unfilteredRow(cfs.metadata, new BufferDecoratedKey(mid,
ByteBufferUtil.bytes("inconceivable!")), false));
         validator.complete();
 
         // confirm that the tree was validated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index b60a039..ecffbbd 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -327,7 +327,7 @@ public class DataResolverTest
                                                                                         
              .add("c2", "v2")
                                                                                         
              .buildUpdate())));
         InetAddress peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.empty(cfm)));
+        resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm,
false)));
 
         try(PartitionIterator data = resolver.resolve();
             RowIterator rows = Iterators.getOnlyElement(data))
@@ -349,8 +349,8 @@ public class DataResolverTest
     public void testResolveWithBothEmpty()
     {
         DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
-        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
-        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
+        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm,
false)));
+        resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm,
false)));
 
         try(PartitionIterator data = resolver.resolve())
         {


Mime
View raw message