cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [4/6] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Fri, 27 Nov 2015 10:19:15 GMT
Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b430eee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b430eee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b430eee

Branch: refs/heads/trunk
Commit: 7b430eee69d8f70b086a30a6e9d3c42a9db4aa08
Parents: d300a18 61e0251
Author: Benjamin Lerer <b.lerer@gmail.com>
Authored: Fri Nov 27 11:15:08 2015 +0100
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Fri Nov 27 11:16:02 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  9 +-
 .../index/IndexNotAvailableException.java       | 34 ++++++++
 .../cassandra/index/SecondaryIndexManager.java  | 64 +++++++++++---
 .../index/internal/CassandraIndex.java          |  6 +-
 .../cassandra/net/MessageDeliveryTask.java      |  7 +-
 .../validation/entities/SecondaryIndexTest.java | 88 ++++++++++++++++----
 .../index/internal/CustomCassandraIndex.java    |  4 +-
 8 files changed, 178 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5e6c03c,63305d6..252972c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.2.4
 +3.0.1
 + * Fix SELECT statement with IN restrictions on partition key,
 +   ORDER BY and LIMIT (CASSANDRA-10729)
 + * Improve stress performance over 1k threads (CASSANDRA-7217)
 + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
 + * Unable to create a function with argument of type Inet (CASSANDRA-10741)
 + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
 + * Correctly preserve deletion info on updated rows when notifying indexers
 +   of single-row deletions (CASSANDRA-10694)
 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
+  * Reject index queries while the index is building (CASSANDRA-8505)
   * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC
(CASSANDRA-10747)
   * Fix JSON update with prepared statements (CASSANDRA-10631)
   * Don't do anticompaction after subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 301cb86,cd86336..5ab1ee5
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -17,92 -17,39 +17,93 @@@
   */
  package org.apache.cassandra.db;
  
 -import java.io.DataInput;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 +import java.util.*;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.filter.IDiskAtomFilter;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
 -import org.apache.cassandra.db.filter.SliceQueryFilter;
 +import com.google.common.collect.Lists;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.index.Index;
++import org.apache.cassandra.index.IndexNotAvailableException;
  import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.service.IReadCommand;
 -import org.apache.cassandra.service.RowDataResolver;
 -import org.apache.cassandra.service.pager.Pageable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.UnknownIndexException;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
  
 -public abstract class ReadCommand implements IReadCommand, Pageable
 +/**
 + * General interface for storage-engine read commands (common to both range and
 + * single partition commands).
 + * <p>
 + * This contains all the informations needed to do a local read.
 + */
 +public abstract class ReadCommand implements ReadQuery
  {
 -    public enum Type
 +    protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 +
 +    public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 +    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer'
for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new
RangeSliceSerializer();
 +
 +    public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer
= new LegacyRangeSliceCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer
= new LegacyPagedRangeCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer
= new LegacyReadCommandSerializer();
 +
 +    private final Kind kind;
 +    private final CFMetaData metadata;
 +    private final int nowInSec;
 +
 +    private final ColumnFilter columnFilter;
 +    private final RowFilter rowFilter;
 +    private final DataLimits limits;
 +
 +    // SecondaryIndexManager will attempt to provide the most selective of any available
indexes
 +    // during execution. Here we also store an the results of that lookup to repeating it
over
 +    // the lifetime of the command.
 +    protected Optional<IndexMetadata> index = Optional.empty();
 +
 +    // Flag to indicate whether the index manager has been queried to select an index for
this
 +    // command. This is necessary as the result of that lookup may be null, in which case
we
 +    // still don't want to repeat it.
 +    private boolean indexManagerQueried = false;
 +
 +    private boolean isDigestQuery;
 +    // if a digest query, the version for which the digest is expected. Ignored if not a
digest.
 +    private int digestVersion;
 +    private final boolean isForThrift;
 +
 +    protected static abstract class SelectionDeserializer
      {
 -        GET_BY_NAMES((byte)1),
 -        GET_SLICES((byte)2);
 +        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest,
int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter,
RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
 +    }
  
 -        public final byte serializedValue;
 +    protected enum Kind
 +    {
 +        SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
 +        PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
  
 -        private Type(byte b)
 -        {
 -            this.serializedValue = b;
 -        }
 +        private final SelectionDeserializer selectionDeserializer;
  
 -        public static Type fromSerializedValue(byte b)
 +        Kind(SelectionDeserializer selectionDeserializer)
          {
 -            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +            this.selectionDeserializer = selectionDeserializer;
          }
      }
  
@@@ -231,707 -95,55 +232,713 @@@
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command as a digest
query as
 +     * this allows us to use the command as a carrier of the digest version even if we only
call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is used for digest
query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried by this command
 +     * and in practice, this will almost always return the same filter, but for the sake
of
 +     * paging, the filter on the first key of a range command might be slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code
key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup
orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
 +
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter
selection)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, digestVersion)
 +             : ReadResponse.createDataResponse(iterator, selection);
 +    }
 +
 +    public long indexSerializedSize(int version)
 +    {
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), version);
 +        else
 +            return 0;
 +    }
 +
 +    public Index getIndex(ColumnFamilyStore cfs)
 +    {
 +        // if we've already consulted the index manager, and it returned a valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the index manager
 +        // then no registered index is suitable for this command, so just return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
  
 -    public abstract IDiskAtomFilter filter();
 +        if (selected == null)
 +            return null;
  
 -    public String getKeyspace()
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
 +    }
 +
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know
it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long as we do close
the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
      {
 -        return ksName;
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
-         Index.Searcher searcher = index == null ? null : index.searcherFor(this);
 +
++        Index.Searcher searcher = null;
 +        if (index != null)
++        {
++            if (!cfs.indexManager.isIndexQueryable(index))
++                throw new IndexNotAvailableException(index);
++
++            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName,
cfs.metadata.cfName, index.getIndexMetadata().name);
++        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator,
cfs), cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already satisfy the primary
expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here because it's convenient.
However,
 +            // we'll probably want to optimize by pushing it down the layer (like for dropped
columns) as it
 +            // would be more efficient (the sooner we discard stuff we know we don't care,
the less useless
 +            // processing we do on it).
 +            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
 +    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
      {
 -        return null;
 +        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        return row;
 +        return ReadOrderGroup.forCommand(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the command are
recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
 +     */
 +    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator
iter, final TableMetrics metric, final long startTimeNanos)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted
(see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(),
currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d tombstone cells
for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
 +                    ClientWarn.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones,
(warnTombstones ? " (see tombstone_warn_threshold)" : ""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the
memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException
for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence counting them
is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator
iterator, ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(),
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
 +            protected long getMaxPurgeableTimestamp()
 +            {
 +                return Long.MAX_VALUE;
 +            }
 +        }
 +        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the original user string,
first
 +     * because there isn't always a single syntax for a given query,  but also because we
don't have
 +     * all the information needed (we know the non-PK columns queried but not the PK ones
as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this should be
good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws
IOException
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer
to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift())
| indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                return legacyReadCommandSerializer.deserialize(in, version);
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version,
metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion,
isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in,
int version, CFMetaData cfm) throws IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index on %s.%s with
the id %s. " +
 +                                               "If an index was just created, this is likely
due to the schema not " +
 +                                               "being fully propagated. Local read will
proceed without using the " +
 +                                               "index. Please wait for schema agreement
after index creation.",
 +                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer
to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion())
: 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as
long as we maintain pre-3.0
 +    // compatibility
 +    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws
IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                legacyRangeSliceCommandSerializer.serialize(command, out, version);
 +            else
 +                serializer.serialize(command, out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
 +                 : serializer.deserialize(in, version);
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
 +                 : serializer.serializedSize(command, version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws
IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to
millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty()
&& !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(),
filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits.Kind kind = rangeCommand.limits().kind();
 +                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT)
&& rangeCommand.limits().perPartitionCount() == 1;
 +                if (isDistinct)
 +                    out.writeInt(1);
 +                else
 +                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(),
filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty()
|| filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (kind == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (isDistinct && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(),
out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount()
== 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for nonexistent
table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter
= LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in,
metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in,
metadata.partitioner, version);
 +            int maxResults = in.readInt();
 +
 +            in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
 +            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 &&
selectsStatics);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection,
rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws
IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata,
columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty()
&& !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(),
makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(),
version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand
command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter,
metadata);
 +            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(),
metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), command.limits(), newRange,
Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup,
CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't select static
columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the partition,
but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row as it would
confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't
select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/IndexNotAvailableException.java
index 0000000,0000000..5440e2a
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
@@@ -1,0 -1,0 +1,34 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.index;
++
++/**
++ * Thrown if a secondary index is not currently available.
++ */
++public final class IndexNotAvailableException extends RuntimeException
++{
++    /**
++     * Creates a new <code>IndexNotAvailableException</code> for the specified
index.
++     * @param name the index name
++     */
++    public IndexNotAvailableException(Index index)
++    {
++        super(String.format("The secondary index '%s' is not yet available", index.getIndexMetadata().name));
++    }
++}


Mime
View raw message