cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [3/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Tue, 20 Oct 2015 12:42:50 GMT
Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b5e3a9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b5e3a9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b5e3a9b

Branch: refs/heads/trunk
Commit: 1b5e3a9b1be0c945782492e269acb4ea44730ad3
Parents: a880739 98be5de
Author: blerer <benjamin.lerer@datastax.com>
Authored: Tue Oct 20 14:38:51 2015 +0200
Committer: blerer <benjamin.lerer@datastax.com>
Committed: Tue Oct 20 14:39:39 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../service/pager/AbstractQueryPager.java       | 21 ++++++++++++++++++--
 .../service/pager/RangeSliceQueryPager.java     |  5 +++--
 .../service/pager/SinglePartitionPager.java     |  3 +--
 4 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 25ad1fb,458d0d5..616ff47
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,20 +1,36 @@@
 -2.2.4
 +3.0
 +Merged from 2.2:
   * Expose phi values from failure detector via JMX and tweak debug
     and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix paging issues with partitions containing only static columns data (CASSANDRA-10381)
   * Fix conditions on static columns (CASSANDRA-10264)
   * AssertionError: attempted to delete non-existing file CommitLog (CASSANDRA-10377)
 - * (cqlsh) Distinguish negative and positive infinity in output (CASSANDRA-10523)
 - * (cqlsh) allow custom time_format for COPY TO (CASSANDRA-8970)
 - * Don't allow startup if the node's rack has changed (CASSANDRA-10242)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
  
  
 -2.2.3
 +3.0-rc2
 + * Fix SELECT DISTINCT queries between 2.2.2 nodes and 3.0 nodes (CASSANDRA-10473)
 + * Remove circular references in SegmentedFile (CASSANDRA-10543)
 + * Ensure validation of indexed values only occurs once per-partition (CASSANDRA-10536)
 + * Fix handling of static columns for range tombstones in thrift (CASSANDRA-10174)
 + * Support empty ColumnFilter for backward compatility on empty IN (CASSANDRA-10471)
 + * Remove Pig support (CASSANDRA-10542)
 + * Fix LogFile throws Exception when assertion is disabled (CASSANDRA-10522)
 + * Revert CASSANDRA-7486, make CMS default GC, move GC config to
 +   conf/jvm.options (CASSANDRA-10403)
 + * Fix TeeingAppender causing some logs to be truncated/empty (CASSANDRA-10447)
 + * Allow EACH_QUORUM for reads (CASSANDRA-9602)
 + * Fix potential ClassCastException while upgrading (CASSANDRA-10468)
 + * Fix NPE in MVs on update (CASSANDRA-10503)
 + * Only include modified cell data in indexing deltas (CASSANDRA-10438)
 + * Do not load keyspace when creating sstable writer (CASSANDRA-10443)
 + * If node is not yet gossiping write all MV updates to batchlog only (CASSANDRA-10413)
 + * Re-populate token metadata after commit log recovery (CASSANDRA-10293)
 + * Provide additional metrics for materialized views (CASSANDRA-10323)
 + * Flush system schema tables after local schema changes (CASSANDRA-10429)
 +Merged from 2.2:
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix the regression when using LIMIT with aggregates (CASSANDRA-10487)
   * Avoid NoClassDefFoundError during DataDescriptor initialization on windows (CASSANDRA-10412)
   * Preserve case of quoted Role & User names (CASSANDRA-10394)
   * cqlsh pg-style-strings broken (CASSANDRA-10484)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index b92d1e1,2a35e4b..bdebd43
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -17,179 -17,381 +17,196 @@@
   */
  package org.apache.cassandra.service.pager;
  
 -import java.util.*;
 +import java.util.NoSuchElementException;
  
 -import com.google.common.annotations.VisibleForTesting;
 -
--import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.filter.ColumnCounter;
 -import org.apache.cassandra.db.filter.IDiskAtomFilter;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.filter.DataLimits;
  import org.apache.cassandra.exceptions.RequestExecutionException;
  import org.apache.cassandra.exceptions.RequestValidationException;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.service.ClientState;
  
  abstract class AbstractQueryPager implements QueryPager
  {
 -    private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
 -
 -    private final ConsistencyLevel consistencyLevel;
 -    private final boolean localQuery;
 -
 -    protected final CFMetaData cfm;
 -    protected final IDiskAtomFilter columnFilter;
 -    private final long timestamp;
 +    protected final ReadCommand command;
 +    protected final DataLimits limits;
 +    protected final int protocolVersion;
  
      private int remaining;
 -    private boolean exhausted;
 -    private boolean shouldFetchExtraRow;
 -
 -    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
 -                                 int toFetch,
 -                                 boolean localQuery,
 -                                 String keyspace,
 -                                 String columnFamily,
 -                                 IDiskAtomFilter columnFilter,
 -                                 long timestamp)
 -    {
 -        this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace,
columnFamily), columnFilter, timestamp);
 -    }
 -
 -    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
 -                                 int toFetch,
 -                                 boolean localQuery,
 -                                 CFMetaData cfm,
 -                                 IDiskAtomFilter columnFilter,
 -                                 long timestamp)
 -    {
 -        this.consistencyLevel = consistencyLevel;
 -        this.localQuery = localQuery;
 -
 -        this.cfm = cfm;
 -        this.columnFilter = columnFilter;
 -        this.timestamp = timestamp;
 -
 -        this.remaining = toFetch;
 -    }
 -
 -
 -    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
 -    {
 -        if (isExhausted())
 -            return Collections.emptyList();
 -
 -        int currentPageSize = nextPageSize(pageSize);
 -        List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel,
localQuery));
 -
 -        if (rows.isEmpty())
 -        {
 -            logger.debug("Got empty set of rows, considering pager exhausted");
 -            exhausted = true;
 -            return Collections.emptyList();
 -        }
 -
 -        int liveCount = getPageLiveCount(rows);
 -        logger.debug("Fetched {} live rows", liveCount);
 -
 -        // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may
be greater than what asked
 -        // (currentPageSize). This would throw off the paging logic so we trim the excess.
It's not extremely efficient
 -        // but most of the time there should be nothing or very little to trim.
 -        if (liveCount > currentPageSize)
 -        {
 -            rows = discardLast(rows, liveCount - currentPageSize);
 -            liveCount = currentPageSize;
 -        }
 -
 -        remaining -= liveCount;
 -
 -        // If we've got less than requested, there is no more query to do (but
 -        // we still need to return the current page)
 -        if (liveCount < currentPageSize)
 -        {
 -            logger.debug("Got result ({}) smaller than page size ({}), considering pager
exhausted", liveCount, currentPageSize);
 -            exhausted = true;
 -        }
 -
 -        // If it's not the first query and the first column is the last one returned (likely
 -        // but not certain since paging can race with deletes/expiration), then remove the
 -        // first column.
 -        if (containsPreviousLast(rows.get(0)))
 -        {
 -            rows = discardFirst(rows);
 -            remaining++;
 -        }
 -        // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the
page size,
 -        // so if the page is full, trim the last entry
 -        else if (shouldFetchExtraRow && !exhausted)
 -        {
 -            // We've asked for one more than necessary
 -            rows = discardLast(rows);
 -            remaining++;
 -        }
 -
 -        logger.debug("Remaining rows to page: {}", remaining);
  
 -        if (!isExhausted())
 -            shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
 +    // This is the last key we've been reading from (or can still be reading within). This
the key for
 +    // which remainingInPartition makes sense: if we're starting another key, we should
reset remainingInPartition
 +    // (and this is done in PagerIterator). This can be null (when we start).
 +    private DecoratedKey lastKey;
 +    private int remainingInPartition;
  
 -        return rows;
 -    }
 +    private boolean exhausted;
  
 -    private List<Row> filterEmpty(List<Row> result)
 +    protected AbstractQueryPager(ReadCommand command, int protocolVersion)
      {
 -        for (Row row : result)
 -        {
 -            if (row.cf == null || !row.cf.hasColumns())
 -            {
 -                List<Row> newResult = new ArrayList<Row>(result.size() - 1);
 -                for (Row row2 : result)
 -                {
 -                    if (row2.cf == null || !row2.cf.hasColumns())
 -                        continue;
 +        this.command = command;
 +        this.protocolVersion = protocolVersion;
 +        this.limits = command.limits();
  
 -                    newResult.add(row2);
 -                }
 -                return newResult;
 -            }
 -        }
 -        return result;
 +        this.remaining = limits.count();
 +        this.remainingInPartition = limits.perPartitionCount();
      }
  
 -    protected void restoreState(int remaining, boolean shouldFetchExtraRow)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        this.remaining = remaining;
 -        this.shouldFetchExtraRow = shouldFetchExtraRow;
 +        return command.startOrderGroup();
      }
  
 -    public boolean isExhausted()
 +    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState
clientState) throws RequestValidationException, RequestExecutionException
      {
 -        return exhausted || remaining == 0;
 -    }
 +        if (isExhausted())
 +            return PartitionIterators.EMPTY;
  
 -    public int maxRemaining()
 -    {
 -        return remaining;
 +        pageSize = Math.min(pageSize, remaining);
 +        return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState),
limits.forPaging(pageSize), command.nowInSec());
      }
  
 -    public long timestamp()
 +    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup)
throws RequestValidationException, RequestExecutionException
      {
 -        return timestamp;
 -    }
 +        if (isExhausted())
 +            return PartitionIterators.EMPTY;
  
 -    private int nextPageSize(int pageSize)
 -    {
 -        return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
 +        pageSize = Math.min(pageSize, remaining);
 +        return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup),
limits.forPaging(pageSize), command.nowInSec());
      }
  
 -    public ColumnCounter columnCounter()
 +    private class PagerIterator extends CountingPartitionIterator
      {
 -        return columnFilter.columnCounter(cfm.comparator, timestamp);
 -    }
 +        private final DataLimits pageLimits;
  
 -    protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency,
boolean localQuery) throws RequestValidationException, RequestExecutionException;
 +        private Row lastRow;
  
 -    /**
 -     * Checks to see if the first row of a new page contains the last row from the previous
page.
 -     * @param first the first row of the new page
 -     * @return true if <code>first</code> contains the last from from the previous
page and it is live, false otherwise
 -     */
 -    protected abstract boolean containsPreviousLast(Row first);
 +        private boolean isFirstPartition = true;
 +        private RowIterator nextPartition;
  
 -    /**
 -     * Saves the paging state by recording the last seen partition key and cell name (where
applicable).
 -     * @param last the last row in the current page
 -     * @return true if an extra row should be fetched in the next page,false otherwise
 -     */
 -    protected abstract boolean recordLast(Row last);
 -
 -    protected abstract boolean isReversed();
 -
 -    private List<Row> discardFirst(List<Row> rows)
 -    {
 -        return discardFirst(rows, 1);
 -    }
 -
 -    @VisibleForTesting
 -    List<Row> discardFirst(List<Row> rows, int toDiscard)
 -    {
 -        if (toDiscard == 0 || rows.isEmpty())
 -            return rows;
 -
 -        int i = 0;
 -        DecoratedKey firstKey = null;
 -        ColumnFamily firstCf = null;
 -        while (toDiscard > 0 && i < rows.size())
 +        private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec)
          {
 -            Row first = rows.get(i++);
 -            firstKey = first.key;
 -            firstCf = first.cf.cloneMeShallow(isReversed());
 -            toDiscard -= isReversed()
 -                       ? discardLast(first.cf, toDiscard, firstCf)
 -                       : discardFirst(first.cf, toDiscard, firstCf);
 +            super(iter, pageLimits, nowInSec);
 +            this.pageLimits = pageLimits;
          }
  
 -        // If there is less live data than to discard, all is discarded
 -        if (toDiscard > 0)
 -            return Collections.<Row>emptyList();
 -
 -        // i is the index of the first row that we are sure to keep. On top of that,
 -        // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
 -        int count = firstCf.getColumnCount();
 -        int newSize = rows.size() - (count == 0 ? i : i - 1);
 -        List<Row> newRows = new ArrayList<Row>(newSize);
 -        if (count != 0)
 -            newRows.add(new Row(firstKey, firstCf));
 -        newRows.addAll(rows.subList(i, rows.size()));
 +        @Override
 +        @SuppressWarnings("resource") // iter is closed by closing the result or in close()
 +        public boolean hasNext()
 +        {
 +            while (nextPartition == null && super.hasNext())
 +            {
 +                if (nextPartition == null)
 +                    nextPartition = super.next();
  
 -        return newRows;
 -    }
 +                DecoratedKey key = nextPartition.partitionKey();
 +                if (lastKey == null || !lastKey.equals(key))
 +                    remainingInPartition = limits.perPartitionCount();
  
 -    private List<Row> discardLast(List<Row> rows)
 -    {
 -        return discardLast(rows, 1);
 -    }
 +                lastKey = key;
  
 -    @VisibleForTesting
 -    List<Row> discardLast(List<Row> rows, int toDiscard)
 -    {
 -        if (toDiscard == 0 || rows.isEmpty())
 -            return rows;
 +                // If this is the first partition of this page, this could be the continuation
of a partition we've started
 +                // on the previous page. In which case, we could have the problem that the
partition has no more "regular"
 +                // rows (but the page size is such we didn't knew before) but it does has
a static row. We should then skip
 +                // the partition as returning it would means to the upper layer that the
partition has "only" static columns,
 +                // which is not the case (and we know the static results have been sent
on the previous page).
 +                if (isFirstPartition && isPreviouslyReturnedPartition(key) &&
!nextPartition.hasNext())
 +                {
 +                    nextPartition.close();
 +                    nextPartition = null;
 +                }
  
 -        int i = rows.size()-1;
 -        DecoratedKey lastKey = null;
 -        ColumnFamily lastCf = null;
 -        while (toDiscard > 0 && i >= 0)
 -        {
 -            Row last = rows.get(i--);
 -            lastKey = last.key;
 -            lastCf = last.cf.cloneMeShallow(isReversed());
 -            toDiscard -= isReversed()
 -                       ? discardFirst(last.cf, toDiscard, lastCf)
 -                       : discardLast(last.cf, toDiscard, lastCf);
 +                isFirstPartition = false;
 +            }
 +            return nextPartition != null;
          }
  
 -        // If there is less live data than to discard, all is discarded
 -        if (toDiscard > 0)
 -            return Collections.<Row>emptyList();
 -
 -        // i is the index of the last row that we are sure to keep. On top of that,
 -        // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
 -        int count = lastCf.getColumnCount();
 -        int newSize = count == 0 ? i+1 : i+2;
 -        List<Row> newRows = new ArrayList<Row>(newSize);
 -        newRows.addAll(rows.subList(0, i+1));
 -        if (count != 0)
 -            newRows.add(new Row(lastKey, lastCf));
 -
 -        return newRows;
 -    }
 +        @Override
 +        @SuppressWarnings("resource") // iter is closed by closing the result
 +        public RowIterator next()
 +        {
 +            if (!hasNext())
 +                throw new NoSuchElementException();
  
 -    private int getPageLiveCount(List<Row> page)
 -    {
 -        int count = 0;
 -        for (Row row : page)
 -            count += columnCounter().countAll(row.cf).live();
 -        return count;
 -    }
 +            RowIterator toReturn = nextPartition;
 +            nextPartition = null;
  
 -    private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
 -    {
 -        boolean isReversed = isReversed();
 -        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
 -        return isReversed
 -             ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
 -             : discardHead(toDiscard, newCf, cf.iterator(), tester);
 -    }
 +            return new RowPagerIterator(toReturn);
 +        }
  
 -    private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
 -    {
 -        boolean isReversed = isReversed();
 -        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
 -        return isReversed
 -             ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
 -             : discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
 -    }
 +        @Override
 +        public void close()
 +        {
 +            super.close();
 +            if (nextPartition != null)
 +                nextPartition.close();
  
 -    private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter,
DeletionInfo.InOrderTester tester)
 -    {
 -        ColumnCounter counter = columnCounter();
 +            recordLast(lastKey, lastRow);
  
 -        List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
 +            int counted = counter.counted();
 +            remaining -= counted;
-             remainingInPartition -= counter.countedInCurrentPartition();
++            // If the clustering of the last row returned is a static one, it means that
the partition was only
++            // containing data within the static columns. Therefore, there are not data
remaining within the partition.
++            if (lastRow != null && lastRow.clustering() == Clustering.STATIC_CLUSTERING)
++            {
++                remainingInPartition = 0;
++            }
++            else
++            {
++                remainingInPartition -= counter.countedInCurrentPartition();
++            }
 +            exhausted = counted < pageLimits.count();
 +        }
  
 -        // Discard the first 'toDiscard' live, non-static cells
 -        while (iter.hasNext())
 +        private class RowPagerIterator extends WrappingRowIterator
          {
 -            Cell c = iter.next();
 -
 -            // if it's a static column, don't count it and save it to add to the trimmed
results
 -            ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
 -            if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
 +            RowPagerIterator(RowIterator iter)
              {
 -                staticCells.add(c);
 -                continue;
 +                super(iter);
              }
  
 -            counter.count(c, tester);
 -
 -            // once we've discarded the required amount, add the rest
 -            if (counter.live() > toDiscard)
 +            @Override
++            public Row staticRow()
+             {
 -                for (Cell staticCell : staticCells)
 -                    copy.addColumn(staticCell);
++                Row staticRow = super.staticRow();
++                if (!staticRow.isEmpty())
++                    lastRow = staticRow;
++                return staticRow;
++            }
+ 
 -                copy.addColumn(c);
 -                while (iter.hasNext())
 -                    copy.addColumn(iter.next());
++            @Override
 +            public Row next()
 +            {
 +                lastRow = super.next();
 +                return lastRow;
              }
          }
 -        int live = counter.live();
 -        // We want to take into account the row even if it was containing only static columns
 -        if (live == 0 && !staticCells.isEmpty())
 -            live = 1;
 -        return Math.min(live, toDiscard);
      }
  
 -    private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell>
iter, DeletionInfo.InOrderTester tester)
 +    protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition)
      {
 -        // Redoing the counting like that is not extremely efficient.
 -        // This is called only for reversed slices or in the case of a race between
 -        // paging and a deletion (pretty unlikely), so this is probably acceptable.
 -        int liveCount = columnCounter().countAll(cf).live();
 -
 -        ColumnCounter counter = columnCounter();
 -        // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount
- toDiscard')
 -        while (iter.hasNext())
 -        {
 -            Cell c = iter.next();
 -            counter.count(c, tester);
 -            if (counter.live() > liveCount - toDiscard)
 -                break;
 +        this.lastKey = lastKey;
 +        this.remaining = remaining;
 +        this.remainingInPartition = remainingInPartition;
 +    }
  
 -            copy.addColumn(c);
 -        }
 -        return Math.min(liveCount, toDiscard);
 +    public boolean isExhausted()
 +    {
 +        return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) &&
remainingInPartition == 0);
      }
  
 -    /**
 -     * Returns the first non-static cell in the ColumnFamily.  This is necessary to avoid
recording a static column
 -     * as the "last" cell seen in a reversed query.  Because we will always query static
columns alongside the normal
 -     * data for a page, they are not a good indicator of where paging should resume.  When
we begin the next page, we
 -     * need to start from the last non-static cell.
 -     */
 -    protected Cell firstNonStaticCell(ColumnFamily cf)
 +    public int maxRemaining()
      {
 -        for (Cell cell : cf)
 -        {
 -            ColumnDefinition def = cfm.getColumnDefinition(cell.name());
 -            if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
 -                return cell;
 -        }
 -        return null;
 +        return remaining;
      }
  
 -    protected static Cell lastCell(ColumnFamily cf)
 +    protected int remainingInPartition()
      {
 -        return cf.getReverseSortedColumns().iterator().next();
 +        return remainingInPartition;
      }
 +
 +    protected abstract ReadCommand nextPageReadCommand(int pageSize);
 +    protected abstract void recordLast(DecoratedKey key, Row row);
 +    protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 770875a,fd14c82..fd35b29
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@@ -58,65 -66,68 +58,66 @@@ public class RangeSliceQueryPager exten
      {
          return lastReturnedKey == null
               ? null
 -             : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(),
maxRemaining());
 +             : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(),
remainingInPartition());
      }
  
 -    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel,
boolean localQuery)
 +    protected ReadCommand nextPageReadCommand(int pageSize)
      throws RequestExecutionException
      {
 -        SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
 -        AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange
: makeIncludingKeyBounds(lastReturnedKey);
 -        Composite start = lastReturnedName == null ? sf.start() : lastReturnedName;
 -        PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
 -                                                          command.columnFamily,
 -                                                          command.timestamp,
 -                                                          keyRange,
 -                                                          sf,
 -                                                          start,
 -                                                          sf.finish(),
 -                                                          command.rowFilter,
 -                                                          pageSize,
 -                                                          command.countCQL3Rows);
 -
 -        return localQuery
 -             ? pageCmd.executeLocally()
 -             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
 -    }
 -
 -    protected boolean containsPreviousLast(Row first)
 -    {
 -        if (lastReturnedKey == null || !lastReturnedKey.equals(first.key))
 -            return false;
 -
 -        // Same as SliceQueryPager, we ignore a deleted column
 -        Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
 -        // If the row was containing only static columns it has already been returned and
we can skip it.
 -        if (firstCell == null)
 -            return true;
 +        DataLimits limits;
 +        DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
 +        DataRange pageRange;
 +        if (lastReturnedKey == null)
 +        {
 +            pageRange = fullRange;
 +            limits = command.limits().forPaging(pageSize);
 +        }
 +        else
 +        {
 +            // We want to include the last returned key only if we haven't achieved our
per-partition limit, otherwise, don't bother.
-             boolean includeLastKey = remainingInPartition() > 0;
++            boolean includeLastKey = remainingInPartition() > 0 && lastReturnedRow
!= null;
 +            AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey,
includeLastKey);
 +            if (includeLastKey)
 +            {
 +                pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()),
false);
 +                limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(),
remainingInPartition());
 +            }
 +            else
 +            {
 +                pageRange = fullRange.forSubRange(bounds);
 +                limits = command.limits().forPaging(pageSize);
 +            }
 +        }
  
 -        CFMetaData metadata = Schema.instance.getCFMetaData(command.keyspace, command.columnFamily);
 -        return !first.cf.deletionInfo().isDeleted(firstCell)
 -                && firstCell.isLive(timestamp())
 -                && firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName);
 +        // it won't hurt for the next page command to query the index manager
 +        // again to check for an applicable index, so don't supply one here
 +        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(),
command.rowFilter(), limits, pageRange, Optional.empty());
      }
  
 -    protected boolean recordLast(Row last)
 +    protected void recordLast(DecoratedKey key, Row last)
      {
 -        lastReturnedKey = last.key;
 -        lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name();
 -        return true;
 +        if (last != null)
 +        {
 +            lastReturnedKey = key;
-             lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
++            if (last.clustering() != Clustering.STATIC_CLUSTERING)
++                lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
 +        }
      }
  
 -    protected boolean isReversed()
 +    protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
      {
 -        return ((SliceQueryFilter)command.predicate).reversed;
 +        // Note that lastReturnedKey can be null, but key cannot.
 +        return key.equals(lastReturnedKey);
      }
  
 -    private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
 +    private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey,
boolean includeLastKey)
      {
 -        // We always include lastReturnedKey since we may still be paging within a row,
 -        // and PagedRangeCommand will move over if we're not anyway
 -        AbstractBounds<RowPosition> bounds = command.keyRange;
 +        AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
          if (bounds instanceof Range || bounds instanceof Bounds)
          {
 -            return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
 +            return includeLastKey
 +                 ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
 +                 : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
          }
          else
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 7057e79,51bbf90..70d4559
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@@ -31,57 -26,8 +31,56 @@@ import org.apache.cassandra.db.filter.*
   *
   * For use by MultiPartitionPager.
   */
 -public interface SinglePartitionPager extends QueryPager
 +public class SinglePartitionPager extends AbstractQueryPager
  {
 -    public ByteBuffer key();
 -    public ColumnCounter columnCounter();
 +    private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
 +
 +    private final SinglePartitionReadCommand<?> command;
 +
 +    private volatile PagingState.RowMark lastReturned;
 +
 +    public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState
state, int protocolVersion)
 +    {
 +        super(command, protocolVersion);
 +        this.command = command;
 +
 +        if (state != null)
 +        {
 +            lastReturned = state.rowMark;
 +            restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
 +        }
 +    }
 +
 +    public ByteBuffer key()
 +    {
 +        return command.partitionKey().getKey();
 +    }
 +
 +    public DataLimits limits()
 +    {
 +        return command.limits();
 +    }
 +
 +    public PagingState state()
 +    {
 +        return lastReturned == null
 +             ? null
 +             : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
 +    }
 +
 +    protected ReadCommand nextPageReadCommand(int pageSize)
 +    {
 +        return command.forPaging(lastReturned == null ? null : lastReturned.clustering(command.metadata()),
pageSize);
 +    }
 +
 +    protected void recordLast(DecoratedKey key, Row last)
 +    {
-         if (last != null)
++        if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING)
 +            lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
 +    }
 +
 +    protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
 +    {
-         // We're querying a single partition, so if it's not the first page, it is the previously
returned one.
 +        return lastReturned != null;
 +    }
  }


Mime
View raw message