cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Date Wed, 14 Jan 2015 10:55:41 GMT
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
	src/java/org/apache/cassandra/db/AtomDeserializer.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b723ce7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b723ce7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b723ce7

Branch: refs/heads/trunk
Commit: 0b723ce74d97fb2c75c8b685de9996b26ddfe683
Parents: f01b319 1c9c47d
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Jan 14 11:55:31 2015 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Jan 14 11:55:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../SingleColumnPrimaryKeyRestrictions.java     | 13 +++---
 .../apache/cassandra/db/AtomDeserializer.java   | 34 +++++++++++----
 .../cassandra/db/composites/CellNameType.java   |  2 +-
 .../sstable/format/big/IndexedSliceReader.java  | 46 ++++++++++++++++++--
 .../format/big/SSTableNamesIterator.java        | 25 ++++++++---
 .../cassandra/cql3/RangeDeletionTest.java       | 35 +++++++++++++++
 7 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
index d2a3885,0000000..e109036
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
@@@ -1,308 -1,0 +1,305 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CBuilder;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.Composite.EOC;
 +import org.apache.cassandra.db.composites.Composites;
 +import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +
 +/**
 + * A set of single column restrictions on a primary key part (partition key or clustering
key).
 + */
 +final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The restrictions.
 +     */
 +    private final SingleColumnRestrictions restrictions;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code>
otherwise.
 +     */
 +    private boolean eq;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an IN, <code>false</code>
otherwise.
 +     */
 +    private boolean in;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code>
otherwise.
 +     */
 +    private boolean slice;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Contains,
<code>false</code> otherwise.
 +     */
 +    private boolean contains;
 +
 +    public SingleColumnPrimaryKeyRestrictions(CType ctype)
 +    {
 +        super(ctype);
 +        this.restrictions = new SingleColumnRestrictions();
 +        this.eq = true;
 +    }
 +
 +    private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
 +                                               SingleColumnRestriction restriction) throws
InvalidRequestException
 +    {
 +        super(primaryKeyRestrictions.ctype);
 +        this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
 +
 +        if (!primaryKeyRestrictions.isEmpty())
 +        {
 +            ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
 +            ColumnDefinition newColumn = restriction.getColumnDef();
 +
 +            checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position()
> lastColumn.position(),
 +                       "Clustering column \"%s\" cannot be restricted (preceding column
\"%s\" is restricted by a non-EQ relation)",
 +                       newColumn.name,
 +                       lastColumn.name);
 +
 +            if (newColumn.position() < lastColumn.position())
 +                checkFalse(restriction.isSlice(),
 +                           "PRIMARY KEY column \"%s\" cannot be restricted (preceding column
\"%s\" is restricted by a non-EQ relation)",
 +                           restrictions.nextColumn(newColumn).name,
 +                           newColumn.name);
 +        }
 +
 +        if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
 +            this.slice = true;
 +        else if (restriction.isContains() || primaryKeyRestrictions.isContains())
 +            this.contains = true;
 +        else if (restriction.isIN())
 +            this.in = true;
 +        else
 +            this.eq = true;
 +    }
 +
 +    @Override
 +    public boolean isSlice()
 +    {
 +        return slice;
 +    }
 +
 +    @Override
 +    public boolean isEQ()
 +    {
 +        return eq;
 +    }
 +
 +    @Override
 +    public boolean isIN()
 +    {
 +        return in;
 +    }
 +
 +    @Override
 +    public boolean isOnToken()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean isContains()
 +    {
 +        return contains;
 +    }
 +
 +    @Override
 +    public boolean isMultiColumn()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        return restrictions.usesFunction(ksName, functionName);
 +    }
 +
 +    @Override
 +    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +        {
 +            checkTrue(isEmpty(),
 +                      "Mixing single column relations and multi column relations on clustering
columns is not allowed");
 +            return (PrimaryKeyRestrictions) restriction;
 +        }
 +
 +        if (restriction.isOnToken())
 +        {
 +            if (isEmpty())
 +                return (PrimaryKeyRestrictions) restriction;
 +
 +            return new TokenFilter(this, (TokenRestriction) restriction);
 +        }
 +
 +        return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
 +    }
 +
 +    @Override
 +    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
 +        CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
 +        for (ColumnDefinition def : restrictions.getColumnDefs())
 +        {
 +            Restriction r = restrictions.getRestriction(def);
 +            assert !r.isSlice();
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            builder.addEachElementToAll(values);
 +            checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
 +        }
 +
 +        return builder.build();
 +    }
 +
 +    @Override
 +    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws
InvalidRequestException
 +    {
 +        CBuilder builder = ctype.builder();
 +        List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
 +
 +        CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
 +        // The end-of-component of composite doesn't depend on whether the
 +        // component type is reversed or not (i.e. the ReversedType is applied
 +        // to the component comparator but not to the end-of-component itself),
 +        // it only depends on whether the slice is reversed
 +        int keyPosition = 0;
 +        for (ColumnDefinition def : defs)
 +        {
 +            // In a restriction, we always have Bound.START < Bound.END for the "base"
comparator.
 +            // So if we're doing a reverse slice, we must inverse the bounds when giving
them as start and end of the slice filter.
 +            // But if the actual comparator itself is reversed, we must inversed the bounds
too.
 +            Bound b = !def.isReversedType() ? bound : bound.reverse();
 +            Restriction r = restrictions.getRestriction(def);
 +            if (keyPosition != def.position() || r.isContains())
-             {
-                 EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END
: EOC.NONE;
-                 return compositeBuilder.buildWithEOC(eoc);
-             }
++                return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
++
 +            if (r.isSlice())
 +            {
 +                if (!r.hasBound(b))
 +                {
 +                    // There wasn't any non EQ relation on that key, we select all records
having the preceding component as prefix.
 +                    // For composites, if there was preceding component and we're computing
the end, we must change the last component
 +                    // End-Of-Component, otherwise we would be selecting only one record.
-                     EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END
: EOC.NONE;
-                     return compositeBuilder.buildWithEOC(eoc);
++                    return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
 +                }
 +
 +                ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null
clustering key part %s", r);
 +                compositeBuilder.addElementToAll(value);
 +                Composite.EOC eoc = eocFor(r, bound, b);
 +                return compositeBuilder.buildWithEOC(eoc);
 +            }
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            compositeBuilder.addEachElementToAll(values);
 +
 +            checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part
%s", def.name);
 +            keyPosition++;
 +        }
 +        // Means no relation at all or everything was an equal
 +        // Note: if the builder is "full", there is no need to use the end-of-component
bit. For columns selection,
 +        // it would be harmless to do it. However, we use this method got the partition
key too. And when a query
 +        // with 2ndary index is done, and with the the partition provided with an EQ, we'll
end up here, and in that
 +        // case using the eoc would be bad, since for the random partitioner we have no
guarantee that
 +        // prefix.end() will sort after prefix (see #5240).
-         EOC eoc = bound.isEnd() && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
++        EOC eoc = !compositeBuilder.hasRemaining() ? EOC.NONE : (bound.isEnd() ? EOC.END
: EOC.START);
 +        return compositeBuilder.buildWithEOC(eoc);
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(valuesAsComposites(options));
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(boundsAsComposites(b, options));
 +    }
 +
 +    private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
 +    {
 +        if (eocBound.isStart())
 +            return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
 +
 +        return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
 +    }
 +
 +    @Override
 +    public boolean hasBound(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().hasBound(b);
 +    }
 +
 +    @Override
 +    public boolean isInclusive(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().isInclusive(b);
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        return restrictions.hasSupportingIndex(indexManager);
 +    }
 +
 +    @Override
 +    public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions
options) throws InvalidRequestException
 +    {
 +        restrictions.addIndexExpressionTo(expressions, options);
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return restrictions.getColumnDefs();
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomDeserializer.java
index 0c43422,a103647..c71321c
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@@ -41,9 -40,13 +41,13 @@@ public class AtomDeserialize
      private final DataInput in;
      private final ColumnSerializer.Flag flag;
      private final int expireBefore;
 -    private final Descriptor.Version version;
 +    private final Version version;
  
+     // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer)
if it has been
+     // read already, Integer.MIN_VALUE otherwise;
+     private int nextFlags = Integer.MIN_VALUE;
+ 
 -    public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag,
int expireBefore, Descriptor.Version version)
 +    public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag,
int expireBefore, Version version)
      {
          this.type = type;
          this.nameDeserializer = type.newDeserializer(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
index a69cff9,0000000..43fd046
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
@@@ -1,500 -1,0 +1,540 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.ArrayDeque;
 +import java.util.Deque;
 +import java.util.List;
 +
 +import com.google.common.collect.AbstractIterator;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.IndexHelper;
 +import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +/**
 + * This is a reader that finds the block for a starting column and returns blocks before/after
it for each next call.
 + * This function assumes that the CF is sorted by name and exploits the name index.
 + */
 +class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 +{
 +    private final ColumnFamily emptyColumnFamily;
 +
 +    private final SSTableReader sstable;
 +    private final List<IndexHelper.IndexInfo> indexes;
 +    private final FileDataInput originalInput;
 +    private FileDataInput file;
 +    private final boolean reversed;
 +    private final ColumnSlice[] slices;
 +    private final BlockFetcher fetcher;
 +    private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
 +    private final CellNameType comparator;
 +
 +    // Holds range tombstone in reverse queries. See addColumn()
 +    private final Deque<OnDiskAtom> rangeTombstonesReversed;
 +
 +    /**
 +     * This slice reader assumes that slices are sorted correctly, e.g. that for forward
lookup slices are in
 +     * lexicographic order of start elements and that for reverse lookup they are in reverse
lexicographic order of
 +     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a].
This reader also
 +     * assumes that validation has been performed in terms of intervals (no overlapping
intervals).
 +     */
 +    IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input,
ColumnSlice[] slices, boolean reversed)
 +    {
 +        Tracing.trace("Seeking to partition indexed section in data file");
 +        this.sstable = sstable;
 +        this.originalInput = input;
 +        this.reversed = reversed;
 +        this.slices = slices;
 +        this.comparator = sstable.metadata.comparator;
 +        this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null;
 +
 +        try
 +        {
 +            this.indexes = indexEntry.columnsIndex();
 +            emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +            if (indexes.isEmpty())
 +            {
 +                setToRowStart(indexEntry, input);
 +                emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
 +                fetcher = new SimpleBlockFetcher();
 +            }
 +            else
 +            {
 +                emptyColumnFamily.delete(indexEntry.deletionTime());
 +                fetcher = new IndexedBlockFetcher(indexEntry.position);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, file.getPath());
 +        }
 +    }
 +
 +    /**
 +     * Sets the seek position to the start of the row for column scanning.
 +     */
 +    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
 +    {
 +        if (in == null)
 +        {
 +            this.file = sstable.getFileDataInput(rowEntry.position);
 +        }
 +        else
 +        {
 +            this.file = in;
 +            in.seek(rowEntry.position);
 +        }
 +        sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
 +    }
 +
 +    public ColumnFamily getColumnFamily()
 +    {
 +        return emptyColumnFamily;
 +    }
 +
 +    public DecoratedKey getKey()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    protected OnDiskAtom computeNext()
 +    {
 +        while (true)
 +        {
 +            if (reversed)
 +            {
 +                // Return all tombstone for the block first (see addColumn() below)
 +                OnDiskAtom column = rangeTombstonesReversed.poll();
 +                if (column != null)
 +                    return column;
 +            }
 +
 +            OnDiskAtom column = blockColumns.poll();
 +            if (column == null)
 +            {
 +                if (!fetcher.fetchMoreData())
 +                    return endOfData();
 +            }
 +            else
 +            {
 +                return column;
 +            }
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        if (originalInput == null && file != null)
 +            file.close();
 +    }
 +
 +    protected void addColumn(OnDiskAtom col)
 +    {
 +        if (reversed)
 +        {
 +            /*
 +             * We put range tomstone markers at the beginning of the range they delete.
But for reversed queries,
 +             * the caller still need to know about a RangeTombstone before it sees any column
that it covers.
 +             * To make that simple, we keep said tombstones separate and return them all
before any column for
 +             * a given block.
 +             */
 +            if (col instanceof RangeTombstone)
 +                rangeTombstonesReversed.addFirst(col);
 +            else
 +                blockColumns.addFirst(col);
 +        }
 +        else
 +        {
 +            blockColumns.addLast(col);
 +        }
 +    }
 +
 +    private abstract class BlockFetcher
 +    {
 +        protected int currentSliceIdx;
 +
 +        protected BlockFetcher(int sliceIdx)
 +        {
 +            this.currentSliceIdx = sliceIdx;
 +        }
 +
 +        /*
 +         * Return the smallest key selected by the current ColumnSlice.
 +         */
 +        protected Composite currentStart()
 +        {
 +            return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
 +        }
 +
 +        /*
 +         * Return the biggest key selected by the current ColumnSlice.
 +         */
 +        protected Composite currentFinish()
 +        {
 +            return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
 +        }
 +
 +        protected abstract boolean setNextSlice();
 +
 +        protected abstract boolean fetchMoreData();
 +
 +        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
 +        {
 +            return isBeforeSliceStart(column.name());
 +        }
 +
 +        protected boolean isBeforeSliceStart(Composite name)
 +        {
 +            Composite start = currentStart();
 +            return !start.isEmpty() && comparator.compare(name, start) < 0;
 +        }
 +
 +        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
 +        {
 +            Composite finish = currentFinish();
 +            return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
 +        }
 +
 +        protected boolean isAfterSliceFinish(Composite name)
 +        {
 +            Composite finish = currentFinish();
 +            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
 +        }
 +    }
 +
 +    private class IndexedBlockFetcher extends BlockFetcher
 +    {
 +        // where this row starts
 +        private final long columnsStart;
 +
 +        // the index entry for the next block to deserialize
 +        private int nextIndexIdx = -1;
 +
 +        // index of the last block we've read from disk;
 +        private int lastDeserializedBlock = -1;
 +
 +        // For reversed, keep columns at the beginning of the last deserialized block that
 +        // may still match a slice
 +        private final Deque<OnDiskAtom> prefetched;
 +
 +        public IndexedBlockFetcher(long columnsStart)
 +        {
 +            super(-1);
 +            this.columnsStart = columnsStart;
 +            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
 +            setNextSlice();
 +        }
 +
 +        protected boolean setNextSlice()
 +        {
 +            while (++currentSliceIdx < slices.length)
 +            {
 +                nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes,
comparator, reversed, nextIndexIdx);
 +                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
 +                    // no index block for that slice
 +                    continue;
 +
 +                // Check if we can exclude this slice entirely from the index
 +                IndexInfo info = indexes.get(nextIndexIdx);
 +                if (reversed)
 +                {
 +                    if (!isBeforeSliceStart(info.lastName))
 +                        return true;
 +                }
 +                else
 +                {
 +                    if (!isAfterSliceFinish(info.firstName))
 +                        return true;
 +                }
 +            }
 +            nextIndexIdx = -1;
 +            return false;
 +        }
 +
 +        protected boolean hasMoreSlice()
 +        {
 +            return currentSliceIdx < slices.length;
 +        }
 +
 +        protected boolean fetchMoreData()
 +        {
 +            if (!hasMoreSlice())
 +                return false;
 +
 +            // If we read blocks in reversed disk order, we may have columns from the previous
block to handle.
 +            // Note that prefetched keeps columns in reversed disk order.
++            // Also note that Range Tombstone handling is a bit tricky, because we may run
into range tombstones
++            // that cover a slice *after* we've move to the previous slice. To keep it simple,
we simply include
++            // every RT in prefetched: it's only slightly inefficient to do so and there
is only so much RT that
++            // can be mistakenly added this way.
 +            if (reversed && !prefetched.isEmpty())
 +            {
++                // Whether we've found anything to return in prefetched
 +                boolean gotSome = false;
 +                // Avoids some comparison when we know it's not useful
 +                boolean inSlice = false;
 +
 +                OnDiskAtom prefetchedCol;
 +                while ((prefetchedCol = prefetched.peek() ) != null)
 +                {
 +                    // col is before slice, we update the slice
 +                    if (isColumnBeforeSliceStart(prefetchedCol))
 +                    {
 +                        inSlice = false;
-                         if (!setNextSlice())
-                             return false;
++
++                        // As explained above, we add RT unconditionally
++                        if (prefetchedCol instanceof RangeTombstone)
++                        {
++                            blockColumns.addLast(prefetched.poll());
++                            gotSome = true;
++                            continue;
++                        }
++
++                        // Otherwise, we either move to the next slice or, if we have none
(which can happen
++                        // because we unwind prefetched no matter what due to RT), we skip
the cell
++                        if (hasMoreSlice())
++                            setNextSlice();
++                        else
++                            prefetched.poll();
++
 +                    }
 +                    // col is within slice, all columns
 +                    // (we go in reverse, so as soon as we are in a slice, no need to check
 +                    // we're after the slice until we change slice)
 +                    else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
 +                    {
 +                        blockColumns.addLast(prefetched.poll());
 +                        gotSome = true;
 +                        inSlice = true;
 +                    }
 +                    // if col is after slice, ignore
 +                    else
 +                    {
 +                        prefetched.poll();
 +                    }
 +                }
 +                if (gotSome)
 +                    return true;
 +            }
 +            try
 +            {
 +                return getNextBlock();
 +            }
 +            catch (IOException e)
 +            {
 +                throw new CorruptSSTableException(e, file.getPath());
 +            }
 +        }
 +
 +        private boolean getNextBlock() throws IOException
 +        {
 +            if (lastDeserializedBlock == nextIndexIdx)
 +            {
 +                if (reversed)
 +                    nextIndexIdx--;
 +                else
 +                    nextIndexIdx++;
 +            }
 +            lastDeserializedBlock = nextIndexIdx;
 +
 +            // Are we done?
 +            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
 +                return false;
 +
 +            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
 +
 +            /* seek to the correct offset to the data, and calculate the data size */
 +            long positionToSeek = columnsStart + currentIndex.offset;
 +
 +            // With new promoted indexes, our first seek in the data file will happen at
that point.
 +            if (file == null)
 +                file = originalInput == null ? sstable.getFileDataInput(positionToSeek)
: originalInput;
 +
 +            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file,
sstable.descriptor.version);
 +
 +            file.seek(positionToSeek);
 +            FileMark mark = file.mark();
 +
 +            // We remenber when we are whithin a slice to avoid some comparison
 +            boolean inSlice = false;
 +
 +            // scan from index start
 +            while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
 +            {
 +                // col is before slice
 +                // (If in slice, don't bother checking that until we change slice)
 +                Composite start = currentStart();
 +                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start)
< 0)
 +                {
++                    // If it's a rangeTombstone, then we need to read it and include it
unless it's end
++                    // stops before our slice start.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, start) >= 0)
++                            addColumn(rt);
++                        continue;
++                    }
++
 +                    if (reversed)
 +                    {
 +                        // the next slice select columns that are before the current one,
so it may
 +                        // match this column, so keep it around.
 +                        prefetched.addFirst(deserializer.readNext());
 +                    }
 +                    else
 +                    {
 +                        deserializer.skipNext();
 +                    }
 +                }
 +                // col is within slice
 +                else
 +                {
 +                    Composite finish = currentFinish();
 +                    if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
 +                    {
 +                        inSlice = true;
 +                        addColumn(deserializer.readNext());
 +                    }
 +                    // col is after slice.
 +                    else
 +                    {
 +                        // When reading forward, if we hit a column that sorts after the
current slice, it means we're done with this slice.
 +                        // For reversed, this may either mean that we're done with the current
slice, or that we need to read the previous
 +                        // index block. However, we can be sure that we are in the first
case though (the current slice is done) if the first
 +                        // columns of the block were not part of the current slice, i.e.
if we have columns in prefetched.
 +                        if (reversed && prefetched.isEmpty())
 +                            break;
 +
 +                        if (!setNextSlice())
 +                            break;
 +
 +                        inSlice = false;
 +
 +                        // The next index block now corresponds to the first block that
may have columns for the newly set slice.
 +                        // So if it's different from the current block, we're done with
this block. And in that case, we know
 +                        // that our prefetched columns won't match.
 +                        if (nextIndexIdx != lastDeserializedBlock)
 +                        {
 +                            if (reversed)
 +                                prefetched.clear();
 +                            break;
 +                        }
 +
 +                        // Even if the next slice may have column in this blocks, if we're
reversed, those columns have been
 +                        // prefetched and we're done with that block
 +                        if (reversed)
 +                            break;
 +
 +                        // otherwise, we will deal with that column at the next iteration
 +                    }
 +                }
 +            }
 +            return true;
 +        }
 +    }
 +
 +    private class SimpleBlockFetcher extends BlockFetcher
 +    {
 +        public SimpleBlockFetcher() throws IOException
 +        {
 +            // Since we have to deserialize in order and will read all slices might as well
reverse the slices and
 +            // behave as if it was not reversed
 +            super(reversed ? slices.length - 1 : 0);
 +
 +            // We remenber when we are whithin a slice to avoid some comparison
 +            boolean inSlice = false;
 +
 +            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file,
sstable.descriptor.version);
 +            while (deserializer.hasNext())
 +            {
 +                // col is before slice
 +                // (If in slice, don't bother checking that until we change slice)
 +                Composite start = currentStart();
 +                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start)
< 0)
 +                {
-                     deserializer.skipNext();
++                    // If it's a rangeTombstone, then we need to read it and include it
unless it's end
++                    // stops before our slice start. Otherwise, we can skip it.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, start) >= 0)
++                            addColumn(rt);
++                    }
++                    else
++                    {
++                        deserializer.skipNext();
++                    }
 +                    continue;
 +                }
 +
 +                // col is within slice
 +                Composite finish = currentFinish();
 +                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
 +                {
 +                    inSlice = true;
 +                    addColumn(deserializer.readNext());
 +                }
 +                // col is after slice. more slices?
 +                else
 +                {
 +                    inSlice = false;
 +                    if (!setNextSlice())
 +                        break;
 +                }
 +            }
 +        }
 +
 +        protected boolean setNextSlice()
 +        {
 +            if (reversed)
 +            {
 +                if (currentSliceIdx <= 0)
 +                    return false;
 +
 +                currentSliceIdx--;
 +            }
 +            else
 +            {
 +                if (currentSliceIdx >= slices.length - 1)
 +                    return false;
 +
 +                currentSliceIdx++;
 +            }
 +            return true;
 +        }
 +
 +        protected boolean fetchMoreData()
 +        {
 +            return false;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index 07dc59a,0000000..c51e595
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@@ -1,250 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.collect.AbstractIterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.IndexHelper;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 +{
 +    private ColumnFamily cf;
 +    private final SSTableReader sstable;
 +    private FileDataInput fileToClose;
 +    private Iterator<OnDiskAtom> iter;
 +    public final SortedSet<CellName> columns;
 +    public final DecoratedKey key;
 +
 +    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName>
columns)
 +    {
 +        assert columns != null;
 +        this.sstable = sstable;
 +        this.columns = columns;
 +        this.key = key;
 +
 +        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
 +        if (indexEntry == null)
 +            return;
 +
 +        try
 +        {
 +            read(sstable, null, indexEntry);
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +        finally
 +        {
 +            if (fileToClose != null)
 +                FileUtils.closeQuietly(fileToClose);
 +        }
 +    }
 +
 +    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey
key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
 +    {
 +        assert columns != null;
 +        this.sstable = sstable;
 +        this.columns = columns;
 +        this.key = key;
 +
 +        try
 +        {
 +            read(sstable, file, indexEntry);
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    private FileDataInput createFileDataInput(long position)
 +    {
 +        fileToClose = sstable.getFileDataInput(position);
 +        return fileToClose;
 +    }
 +
 +    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
 +    throws IOException
 +    {
 +        List<IndexHelper.IndexInfo> indexList;
 +
 +        // If the entry is not indexed or the index is not promoted, read from the row start
 +        if (!indexEntry.isIndexed())
 +        {
 +            if (file == null)
 +                file = createFileDataInput(indexEntry.position);
 +            else
 +                file.seek(indexEntry.position);
 +
 +            DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
 +            assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key,
file.getPath());
 +        }
 +
 +        indexList = indexEntry.columnsIndex();
 +
 +        if (!indexEntry.isIndexed())
 +        {
 +            ColumnFamilySerializer serializer = ColumnFamily.serializer;
 +            try
 +            {
 +                cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +                cf.delete(DeletionTime.serializer.deserialize(file));
 +            }
 +            catch (Exception e)
 +            {
 +                throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName()
+ " with " + sstable.metadata + " from " + file, e);
 +            }
 +        }
 +        else
 +        {
 +            cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +            cf.delete(indexEntry.deletionTime());
 +        }
 +
 +        List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
 +        if (indexList.isEmpty())
 +        {
 +            readSimpleColumns(file, columns, result);
 +        }
 +        else
 +        {
 +            readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position,
result);
 +        }
 +
 +        // create an iterator view of the columns we read
 +        iter = result.iterator();
 +    }
 +
 +    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames,
List<OnDiskAtom> result)
 +    {
 +        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file,
sstable.descriptor.version);
 +        int n = 0;
 +        while (atomIterator.hasNext())
 +        {
 +            OnDiskAtom column = atomIterator.next();
 +            if (column instanceof Cell)
 +            {
 +                if (columnNames.contains(column.name()))
 +                {
 +                    result.add(column);
 +                    if (++n >= columns.size())
 +                        break;
 +                }
 +            }
 +            else
 +            {
 +                result.add(column);
 +            }
 +        }
 +    }
 +
 +    private void readIndexedColumns(CFMetaData metadata,
 +                                    FileDataInput file,
 +                                    SortedSet<CellName> columnNames,
 +                                    List<IndexHelper.IndexInfo> indexList,
 +                                    long basePosition,
 +                                    List<OnDiskAtom> result)
 +    throws IOException
 +    {
 +        /* get the various column ranges we have to read */
 +        CellNameType comparator = metadata.comparator;
 +        List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
 +        int lastIndexIdx = -1;
 +        for (CellName name : columnNames)
 +        {
 +            int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
 +            if (index < 0 || index == indexList.size())
 +                continue;
 +            IndexHelper.IndexInfo indexInfo = indexList.get(index);
 +            // Check the index block does contain the column names and that we haven't inserted
this block yet.
 +            if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
 +                continue;
 +
 +            ranges.add(indexInfo);
 +            lastIndexIdx = index;
 +        }
 +
 +        if (ranges.isEmpty())
 +            return;
 +
 +        Iterator<CellName> toFetch = columnNames.iterator();
 +        CellName nextToFetch = toFetch.next();
 +        for (IndexHelper.IndexInfo indexInfo : ranges)
 +        {
 +            long positionToSeek = basePosition + indexInfo.offset;
 +
 +            // With new promoted indexes, our first seek in the data file will happen at
that point.
 +            if (file == null)
 +                file = createFileDataInput(positionToSeek);
 +
 +            AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
 +            file.seek(positionToSeek);
 +            FileMark mark = file.mark();
 +            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch
!= null)
 +            {
 +                int cmp = deserializer.compareNextTo(nextToFetch);
-                 if (cmp == 0)
++                if (cmp < 0)
++                {
++                    // If it's a rangeTombstone, then we need to read it and include
++                    // it if it includes our target. Otherwise, we can skip it.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, nextToFetch) >= 0)
++                            result.add(rt);
++                    }
++                    else
++                    {
++                        deserializer.skipNext();
++                    }
++                }
++                else if (cmp == 0)
 +                {
 +                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
 +                    result.add(deserializer.readNext());
-                     continue;
 +                }
- 
-                 deserializer.skipNext();
-                 if (cmp > 0)
++                else
++                {
++                    deserializer.skipNext();
 +                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
++                }
 +            }
 +        }
 +    }
 +
 +    public DecoratedKey getKey()
 +    {
 +        return key;
 +    }
 +
 +    public ColumnFamily getColumnFamily()
 +    {
 +        return cf;
 +    }
 +
 +    protected OnDiskAtom computeNext()
 +    {
 +        if (iter == null || !iter.hasNext())
 +            return endOfData();
 +        return iter.next();
 +    }
 +
 +    public void close() throws IOException { }
 +}


Mime
View raw message