cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [14/18] cassandra git commit: Merge branch 'cameron/12765-2.2' into cameron/12765-3.0
Date Fri, 21 Oct 2016 07:15:49 GMT
Merge branch 'cameron/12765-2.2' into cameron/12765-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7976d65a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7976d65a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7976d65a

Branch: refs/heads/cassandra-3.X
Commit: 7976d65af230b8322a54a4462f2ea37bbe185a71
Parents: 8b04ec9 a370409
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Fri Oct 21 08:55:20 2016 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Oct 21 08:55:20 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/SinglePartitionReadCommand.java          |  6 +--
 .../io/sstable/format/SSTableReader.java        |  8 ++++
 .../db/SinglePartitionReadCommandCQLTest.java   | 41 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7976d65a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b309e96,efd0f5c..f14eb22
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -34,64 -16,6 +34,65 @@@ Merged from 2.2
   * Fix authentication problem when invoking cqlsh copy from a SOURCE command (CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block
    (CASSANDRA-12554)
 +Merged from 2.1:
 + * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
++ * Don't skip sstables based on maxLocalDeletionTime (CASSANDRA-12765)
 +
 +
 +3.0.9
 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
 + * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
 + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
 + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 + * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
 + * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
 +Merged from 2.2:
   * Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
   * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
   * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7976d65a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 23b02f3,0000000..5f8df1b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,1028 -1,0 +1,1028 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.cache.IRowCacheEntry;
 +import org.apache.cassandra.cache.RowCacheKey;
 +import org.apache.cassandra.cache.RowCacheSentinel;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.lifecycle.*;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.pager.*;
 +import org.apache.cassandra.thrift.ThriftResultsMerger;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.SearchIterator;
 +import org.apache.cassandra.utils.btree.BTreeSet;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +
 +
 +/**
 + * A read command that selects a (part of a) single partition.
 + */
 +public class SinglePartitionReadCommand extends ReadCommand
 +{
 +    protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 +
 +    private final DecoratedKey partitionKey;
 +    private final ClusteringIndexFilter clusteringIndexFilter;
 +
 +    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 +
 +    public SinglePartitionReadCommand(boolean isDigest,
 +                                      int digestVersion,
 +                                      boolean isForThrift,
 +                                      CFMetaData metadata,
 +                                      int nowInSec,
 +                                      ColumnFilter columnFilter,
 +                                      RowFilter rowFilter,
 +                                      DataLimits limits,
 +                                      DecoratedKey partitionKey,
 +                                      ClusteringIndexFilter clusteringIndexFilter)
 +    {
 +        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
 +        assert partitionKey.getPartitioner() == metadata.partitioner;
 +        this.partitionKey = partitionKey;
 +        this.clusteringIndexFilter = clusteringIndexFilter;
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param rowFilter the row filter to use for the query.
 +     * @param limits the limits to use for the query.
 +     * @param partitionKey the partition key for the partition to query.
 +     * @param clusteringIndexFilter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata,
 +                                                    int nowInSec,
 +                                                    ColumnFilter columnFilter,
 +                                                    RowFilter rowFilter,
 +                                                    DataLimits limits,
 +                                                    DecoratedKey partitionKey,
 +                                                    ClusteringIndexFilter clusteringIndexFilter)
 +    {
 +        return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition for thrift.
 +     *
 +     * @param isForThrift whether the query is for thrift or not.
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param rowFilter the row filter to use for the query.
 +     * @param limits the limits to use for the query.
 +     * @param partitionKey the partition key for the partition to query.
 +     * @param clusteringIndexFilter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command.
 +     */
 +    public static SinglePartitionReadCommand create(boolean isForThrift,
 +                                                    CFMetaData metadata,
 +                                                    int nowInSec,
 +                                                    ColumnFilter columnFilter,
 +                                                    RowFilter rowFilter,
 +                                                    DataLimits limits,
 +                                                    DecoratedKey partitionKey,
 +                                                    ClusteringIndexFilter clusteringIndexFilter)
 +    {
 +        return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param filter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command. The returned command will use no row filter and have no limits.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
 +    {
 +        return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
 +    }
 +
 +    /**
 +     * Creates a new read command that queries a single partition in its entirety.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     *
 +     * @return a newly created read command that queries all the rows of {@code key}.
 +     */
 +    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
 +    {
 +        return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
 +    }
 +
 +    /**
 +     * Creates a new read command that queries a single partition in its entirety.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     *
 +     * @return a newly created read command that queries all the rows of {@code key}.
 +     */
 +    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
 +    {
 +        return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
 +    }
 +
 +    /**
 +     * Creates a new single partition slice command for the provided single slice.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param slice the slice of rows to query.
 +     *
 +     * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
 +     * query every columns for the table (without limit or row filtering) and be in forward order.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
 +    {
 +        return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
 +    }
 +
 +    /**
 +     * Creates a new single partition slice command for the provided slices.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param slices the slices of rows to query.
 +     *
 +     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
 +     * query every columns for the table (without limit or row filtering) and be in forward order.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
 +    {
 +        ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
 +        return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
 +    }
 +
 +    /**
 +     * Creates a new single partition slice command for the provided slices.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param slices the slices of rows to query.
 +     *
 +     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
 +     * query every columns for the table (without limit or row filtering) and be in forward order.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
 +    {
 +        return create(metadata, nowInSec, metadata.decorateKey(key), slices);
 +    }
 +
 +    /**
 +     * Creates a new single partition name command for the provided rows.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param names the clustering for the rows to query.
 +     *
 +     * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will
 +     * query every columns (without limit or row filtering) and be in forward order.
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
 +    {
 +        ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
 +        return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
 +    }
 +
 +    /**
 +     * Creates a new single partition name command for the provided row.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param name the clustering for the row to query.
 +     *
 +     * @return a newly created read command that queries {@code name} in {@code key}. The returned query will
 +     * query every columns (without limit or row filtering).
 +     */
 +    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Clustering name)
 +    {
 +        return create(metadata, nowInSec, key, FBUtilities.singleton(name, metadata.comparator));
 +    }
 +
 +    public SinglePartitionReadCommand copy()
 +    {
 +        return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
 +    }
 +
 +    public DecoratedKey partitionKey()
 +    {
 +        return partitionKey;
 +    }
 +
 +    public ClusteringIndexFilter clusteringIndexFilter()
 +    {
 +        return clusteringIndexFilter;
 +    }
 +
 +    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
 +    {
 +        return clusteringIndexFilter;
 +    }
 +
 +    public long getTimeout()
 +    {
 +        return DatabaseDescriptor.getReadRpcTimeout();
 +    }
 +
 +    public boolean selectsKey(DecoratedKey key)
 +    {
 +        if (!this.partitionKey().equals(key))
 +            return false;
 +
 +        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
 +    }
 +
 +    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
 +    {
 +        if (clustering == Clustering.STATIC_CLUSTERING)
 +            return !columnFilter().fetchedColumns().statics.isEmpty();
 +
 +        if (!clusteringIndexFilter().selects(clustering))
 +            return false;
 +
 +        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
 +    }
 +
 +    /**
 +     * Returns a new command suitable to paging from the last returned row.
 +     *
 +     * @param lastReturned the last row returned by the previous page. The newly created command
 +     * will only query row that comes after this (in query order). This can be {@code null} if this
 +     * is the first page.
 +     * @param pageSize the size to use for the page to query.
 +     *
 +     * @return the newly create command.
 +     */
 +    public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
 +    {
 +        // We shouldn't have set digest yet when reaching that point
 +        assert !isDigestQuery();
 +        return create(isForThrift(),
 +                      metadata(),
 +                      nowInSec(),
 +                      columnFilter(),
 +                      rowFilter(),
 +                      limits().forPaging(pageSize),
 +                      partitionKey(),
 +                      lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
 +    }
 +
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +    {
 +        return StorageProxy.read(Group.one(this), consistency, clientState);
 +    }
 +
 +    public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
 +    {
 +        return getPager(this, pagingState, protocolVersion);
 +    }
 +
 +    private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
 +    {
 +        return new SinglePartitionPager(command, pagingState, protocolVersion);
 +    }
 +
 +    protected void recordLatency(TableMetrics metric, long latencyNanos)
 +    {
 +        metric.readLatency.addNano(latencyNanos);
 +    }
 +
 +    @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
 +    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
 +    {
 +        UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
 +                                        ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
 +                                        : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
 +        return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
 +    }
 +
 +    /**
 +     * Fetch the rows requested if in cache; if not, read it from disk and cache it.
 +     * <p>
 +     * If the partition is cached, and the filter given is within its bounds, we return
 +     * from cache, otherwise from disk.
 +     * <p>
 +     * If the partition is is not cached, we figure out what filter is "biggest", read
 +     * that from disk, then filter the result and either cache that or return it.
 +     */
 +    private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
 +    {
 +        assert !cfs.isIndex(); // CASSANDRA-5732
 +        assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
 +
 +        RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
 +
 +        // Attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
 +        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
 +        // TODO: don't evict entire partitions on writes (#2864)
 +        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
 +        if (cached != null)
 +        {
 +            if (cached instanceof RowCacheSentinel)
 +            {
 +                // Some other read is trying to cache the value, just do a normal non-caching read
 +                Tracing.trace("Row cache miss (race)");
 +                cfs.metric.rowCacheMiss.inc();
 +                return queryMemtableAndDisk(cfs, readOp);
 +            }
 +
 +            CachedPartition cachedPartition = (CachedPartition)cached;
 +            if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
 +            {
 +                cfs.metric.rowCacheHit.inc();
 +                Tracing.trace("Row cache hit");
 +                UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
 +                cfs.metric.updateSSTableIterated(0);
 +                return unfilteredRowIterator;
 +            }
 +
 +            cfs.metric.rowCacheHitOutOfRange.inc();
 +            Tracing.trace("Ignoring row cache as cached value could not satisfy query");
 +            return queryMemtableAndDisk(cfs, readOp);
 +        }
 +
 +        cfs.metric.rowCacheMiss.inc();
 +        Tracing.trace("Row cache miss");
 +
 +        // Note that on tables with no clustering keys, any positive value of
 +        // rowsToCache implies caching the full partition
 +        boolean cacheFullPartitions = metadata().clusteringColumns().size() > 0 ?
 +                                      metadata().params.caching.cacheAllRows() :
 +                                      metadata().params.caching.cacheRows();
 +
 +        // To be able to cache what we read, what we read must at least covers what the cache holds, that
 +        // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
 +        // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
 +        // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
 +        // we settle for caching what we read only if the user query does query the head of the partition since
 +        // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
 +        // full partitions, in which case we just always read it all and cache.
 +        if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
 +        {
 +            RowCacheSentinel sentinel = new RowCacheSentinel();
 +            boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
 +            boolean sentinelReplaced = false;
 +
 +            try
 +            {
 +                int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
 +                @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
 +                UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
 +                try
 +                {
 +                    // We want to cache only rowsToCache rows
 +                    CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec());
 +                    if (sentinelSuccess && !toCache.isEmpty())
 +                    {
 +                        Tracing.trace("Caching {} rows", toCache.rowCount());
 +                        CacheService.instance.rowCache.replace(key, sentinel, toCache);
 +                        // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
 +                        sentinelReplaced = true;
 +                    }
 +
 +                    // We then re-filter out what this query wants.
 +                    // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
 +                    // than what we've cached, so we can't just use toCache.
 +                    UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
 +                    if (cacheFullPartitions)
 +                    {
 +                        // Everything is guaranteed to be in 'toCache', we're done with 'iter'
 +                        assert !iter.hasNext();
 +                        iter.close();
 +                        return cacheIterator;
 +                    }
 +                    return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
 +                }
 +                catch (RuntimeException | Error e)
 +                {
 +                    iter.close();
 +                    throw e;
 +                }
 +            }
 +            finally
 +            {
 +                if (sentinelSuccess && !sentinelReplaced)
 +                    cfs.invalidateCachedPartition(key);
 +            }
 +        }
 +
 +        Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
 +        return queryMemtableAndDisk(cfs, readOp);
 +    }
 +
 +    /**
 +     * Queries both memtable and sstables to fetch the result of this query.
 +     * <p>
 +     * Please note that this method:
 +     *   1) does not check the row cache.
 +     *   2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
 +     *      Those are applied in {@link ReadCommand#executeLocally}.
 +     *   3) does not record some of the read metrics (latency, scanned cells histograms) nor
 +     *      throws TombstoneOverwhelmingException.
 +     * It is publicly exposed because there is a few places where that is exactly what we want,
 +     * but it should be used only where you know you don't need thoses things.
 +     * <p>
 +     * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
 +     * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
 +     */
 +    public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
 +    {
 +        Tracing.trace("Executing single-partition query on {}", cfs.name);
 +
 +        boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
 +        return queryMemtableAndDiskInternal(cfs, copyOnHeap);
 +    }
 +
 +    @Override
 +    protected int oldestUnrepairedTombstone()
 +    {
 +        return oldestUnrepairedTombstone;
 +    }
 +
 +    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
 +    {
 +        /*
 +         * We have 2 main strategies:
 +         *   1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
 +         *      unless we have a names filter that we know we can optimize futher.
 +         *   2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
 +         *      will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
 +         *      have a way to guarantee we have all the data for what is queried, which is only possible for name queries
 +         *      and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
 +         *      we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
 +         *      and counters are intrinsically a collection of shards and so have the same problem).
 +         */
 +        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
 +            return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
 +
 +        Tracing.trace("Acquiring sstable references");
 +        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
 +
 +        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
 +        ClusteringIndexFilter filter = clusteringIndexFilter();
 +
 +        try
 +        {
 +            for (Memtable memtable : view.memtables)
 +            {
 +                Partition partition = memtable.getPartition(partitionKey());
 +                if (partition == null)
 +                    continue;
 +
 +                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
 +                UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
 +                @SuppressWarnings("resource") // same as above
 +                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
 +                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
 +            }
 +            /*
 +             * We can't eliminate full sstables based on the timestamp of what we've already read like
 +             * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
 +             * we've read. We still rely on the sstable ordering by maxTimestamp since if
 +             *   maxTimestamp_s1 > maxTimestamp_s0,
 +             * we're guaranteed that s1 cannot have a row tombstone such that
 +             *   timestamp(tombstone) > maxTimestamp_s0
 +             * since we necessarily have
 +             *   timestamp(tombstone) <= maxTimestamp_s1
 +             * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
 +             * in one pass, and minimize the number of sstables for which we read a partition tombstone.
 +             */
 +            int sstablesIterated = 0;
 +            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
 +            List<SSTableReader> skippedSSTables = null;
 +            long mostRecentPartitionTombstone = Long.MIN_VALUE;
 +            long minTimestamp = Long.MAX_VALUE;
 +            int nonIntersectingSSTables = 0;
 +
 +            for (SSTableReader sstable : view.sstables)
 +            {
 +                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
 +                // if we've already seen a partition tombstone with a timestamp greater
 +                // than the most recent update to this sstable, we can skip it
 +                if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
 +                    break;
 +
 +                if (!shouldInclude(sstable))
 +                {
 +                    nonIntersectingSSTables++;
 +                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
-                     if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
++                    if (sstable.hasTombstones())
 +                    {
 +                        if (skippedSSTables == null)
 +                            skippedSSTables = new ArrayList<>();
 +                        skippedSSTables.add(sstable);
 +                    }
 +                    continue;
 +                }
 +
 +                sstable.incrementReadCount();
 +                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
 +                UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
 +                if (!sstable.isRepaired())
 +                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 +
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
 +                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
 +                sstablesIterated++;
 +            }
 +
 +            int includedDueToTombstones = 0;
 +            // Check for partition tombstones in the skipped sstables
 +            if (skippedSSTables != null)
 +            {
 +                for (SSTableReader sstable : skippedSSTables)
 +                {
 +                    if (sstable.getMaxTimestamp() <= minTimestamp)
 +                        continue;
 +
 +                    sstable.incrementReadCount();
 +                    @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
 +                    UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
 +                    if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
 +                    {
 +                        iterators.add(iter);
 +                        if (!sstable.isRepaired())
 +                            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 +                        includedDueToTombstones++;
 +                        sstablesIterated++;
 +                    }
 +                    else
 +                    {
 +                        iter.close();
 +                    }
 +                }
 +            }
 +            if (Tracing.isTracing())
 +                Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
 +                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
 +
 +            cfs.metric.updateSSTableIterated(sstablesIterated);
 +
 +            if (iterators.isEmpty())
 +                return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
 +
 +            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
 +
 +            @SuppressWarnings("resource") //  Closed through the closing of the result of that method.
 +            UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
 +            if (!merged.isEmpty())
 +            {
 +                DecoratedKey key = merged.partitionKey();
 +                cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
 +            }
 +
 +            return merged;
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            try
 +            {
 +                FBUtilities.closeAll(iterators);
 +            }
 +            catch (Exception suppressed)
 +            {
 +                e.addSuppressed(suppressed);
 +            }
 +            throw e;
 +        }
 +    }
 +
 +    private boolean shouldInclude(SSTableReader sstable)
 +    {
 +        // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable
 +        // don't tell us if the sstable contains static values in particular.
 +        // TODO: we could record if a sstable contains any static value at all.
 +        if (!columnFilter().fetchedColumns().statics.isEmpty())
 +            return true;
 +
 +        return clusteringIndexFilter().shouldInclude(sstable);
 +    }
 +
 +    private boolean queriesMulticellType()
 +    {
 +        for (ColumnDefinition column : columnFilter().fetchedColumns())
 +        {
 +            if (column.type.isMultiCell() || column.type.isCounter())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
 +     * max timestamp.
 +     *
 +     * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
 +     * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
 +     * no collection or counters are included).
 +     * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
 +     */
 +    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
 +    {
 +        Tracing.trace("Acquiring sstable references");
 +        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
 +
 +        ImmutableBTreePartition result = null;
 +
 +        Tracing.trace("Merging memtable contents");
 +        for (Memtable memtable : view.memtables)
 +        {
 +            Partition partition = memtable.getPartition(partitionKey());
 +            if (partition == null)
 +                continue;
 +
 +            try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
 +            {
 +                if (iter.isEmpty())
 +                    continue;
 +
 +                UnfilteredRowIterator clonedFilter = copyOnHeap
 +                                                   ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
 +                                                   : iter;
 +                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
 +            }
 +        }
 +
 +        /* add the SSTables on disk */
 +        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
 +        int sstablesIterated = 0;
 +        boolean onlyUnrepaired = true;
 +        // read sorted sstables
 +        for (SSTableReader sstable : view.sstables)
 +        {
 +            // if we've already seen a partition tombstone with a timestamp greater
 +            // than the most recent update to this sstable, we're done, since the rest of the sstables
 +            // will also be older
 +            if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
 +                break;
 +
 +            long currentMaxTs = sstable.getMaxTimestamp();
 +            filter = reduceFilter(filter, result, currentMaxTs);
 +            if (filter == null)
 +                break;
 +
 +            if (!shouldInclude(sstable))
 +            {
 +                // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion
 +                // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
 +                // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
 +                // has any tombstone at all as a shortcut.
-                 if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
++                if (!sstable.hasTombstones())
 +                    continue; // Means no tombstone at all, we can skip that sstable
 +
 +                // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
 +                sstable.incrementReadCount();
 +                try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))
 +                {
-                     if (iter.partitionLevelDeletion().isLive())
++                    if (!iter.partitionLevelDeletion().isLive())
 +                    {
 +                        sstablesIterated++;
 +                        result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired());
 +                    }
 +                }
 +                continue;
 +            }
 +
 +            Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
 +            sstable.incrementReadCount();
 +            try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
 +            {
 +                if (iter.isEmpty())
 +                    continue;
 +
 +                if (sstable.isRepaired())
 +                    onlyUnrepaired = false;
 +                sstablesIterated++;
 +                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired());
 +            }
 +        }
 +
 +        cfs.metric.updateSSTableIterated(sstablesIterated);
 +
 +        if (result == null || result.isEmpty())
 +            return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
 +
 +        DecoratedKey key = result.partitionKey();
 +        cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
 +
 +        // "hoist up" the requested data into a more recent sstable
 +        if (sstablesIterated > cfs.getMinimumCompactionThreshold()
 +            && onlyUnrepaired
 +            && !cfs.isAutoCompactionDisabled()
 +            && cfs.getCompactionStrategyManager().shouldDefragment())
 +        {
 +            // !!WARNING!!   if we stop copying our data to a heap-managed object,
 +            //               we will need to track the lifetime of this mutation as well
 +            Tracing.trace("Defragmenting requested data");
 +
 +            try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
 +            {
 +                final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
 +                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
 +                {
 +                    public void run()
 +                    {
 +                        // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
 +                        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
 +                    }
 +                });
 +            }
 +        }
 +
 +        return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
 +    }
 +
 +    private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
 +    {
 +        if (!isRepaired)
 +            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
 +
 +        int maxRows = Math.max(filter.requestedRows().size(), 1);
 +        if (result == null)
 +            return ImmutableBTreePartition.create(iter, maxRows);
 +
 +        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
 +        {
 +            return ImmutableBTreePartition.create(merged, maxRows);
 +        }
 +    }
 +
 +    private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
 +    {
 +        if (result == null)
 +            return filter;
 +
 +        SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
 +
 +        PartitionColumns columns = columnFilter().fetchedColumns();
 +        NavigableSet<Clustering> clusterings = filter.requestedRows();
 +
 +        // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
 +        // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
 +        // that for later.
 +
 +        boolean removeStatic = false;
 +        if (!columns.statics.isEmpty())
 +        {
 +            Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
 +            removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
 +        }
 +
 +        NavigableSet<Clustering> toRemove = null;
 +        for (Clustering clustering : clusterings)
 +        {
 +            if (!searchIter.hasNext())
 +                break;
 +
 +            Row row = searchIter.next(clustering);
 +            if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
 +                continue;
 +
 +            if (toRemove == null)
 +                toRemove = new TreeSet<>(result.metadata().comparator);
 +            toRemove.add(clustering);
 +        }
 +
 +        if (!removeStatic && toRemove == null)
 +            return filter;
 +
 +        // Check if we have everything we need
 +        boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
 +        boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
 +        if (hasNoMoreStatic && hasNoMoreClusterings)
 +            return null;
 +
 +        if (toRemove != null)
 +        {
 +            BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
 +            newClusterings.addAll(Sets.difference(clusterings, toRemove));
 +            clusterings = newClusterings.build();
 +        }
 +        return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
 +    }
 +
 +    private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
 +    {
 +        // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
 +        // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
 +        // and 2) the requested columns.
 +        if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
 +            return false;
 +
 +        for (ColumnDefinition column : requestedColumns)
 +        {
 +            Cell cell = row.getCell(column);
 +            if (cell == null || cell.timestamp() <= sstableTimestamp)
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
 +                             metadata().ksName,
 +                             metadata().cfName,
 +                             columnFilter(),
 +                             rowFilter(),
 +                             limits(),
 +                             metadata().getKeyValidator().getString(partitionKey().getKey()),
 +                             clusteringIndexFilter.toString(metadata()),
 +                             nowInSec());
 +    }
 +
 +    public MessageOut<ReadCommand> createMessage(int version)
 +    {
 +        return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer);
 +    }
 +
 +    protected void appendCQLWhereClause(StringBuilder sb)
 +    {
 +        sb.append(" WHERE ");
 +
 +        sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
 +        DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
 +
 +        // We put the row filter first because the clustering index filter can end by "ORDER BY"
 +        if (!rowFilter().isEmpty())
 +            sb.append(" AND ").append(rowFilter());
 +
 +        String filterString = clusteringIndexFilter().toCQLString(metadata());
 +        if (!filterString.isEmpty())
 +            sb.append(" AND ").append(filterString);
 +    }
 +
 +    protected void serializeSelection(DataOutputPlus out, int version) throws IOException
 +    {
 +        metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
 +        ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
 +    }
 +
 +    protected long selectionSerializedSize(int version)
 +    {
 +        return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
 +             + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
 +    }
 +
 +    /**
 +     * Groups multiple single partition read commands.
 +     */
 +    public static class Group implements ReadQuery
 +    {
 +        public final List<SinglePartitionReadCommand> commands;
 +        private final DataLimits limits;
 +        private final int nowInSec;
 +
 +        public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
 +        {
 +            assert !commands.isEmpty();
 +            this.commands = commands;
 +            this.limits = limits;
 +            this.nowInSec = commands.get(0).nowInSec();
 +            for (int i = 1; i < commands.size(); i++)
 +                assert commands.get(i).nowInSec() == nowInSec;
 +        }
 +
 +        public static Group one(SinglePartitionReadCommand command)
 +        {
 +            return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
 +        }
 +
 +        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +        {
 +            return StorageProxy.read(this, consistency, clientState);
 +        }
 +
 +        public int nowInSec()
 +        {
 +            return nowInSec;
 +        }
 +
 +        public DataLimits limits()
 +        {
 +            return limits;
 +        }
 +
 +        public CFMetaData metadata()
 +        {
 +            return commands.get(0).metadata();
 +        }
 +
 +        public ReadOrderGroup startOrderGroup()
 +        {
 +            // Note that the only difference between the command in a group must be the partition key on which
 +            // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
 +            return commands.get(0).startOrderGroup();
 +        }
 +
 +        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        {
 +            List<PartitionIterator> partitions = new ArrayList<>(commands.size());
 +            for (SinglePartitionReadCommand cmd : commands)
 +                partitions.add(cmd.executeInternal(orderGroup));
 +
 +            // Because we only have enforce the limit per command, we need to enforce it globally.
 +            return limits.filter(PartitionIterators.concat(partitions), nowInSec);
 +        }
 +
 +        public QueryPager getPager(PagingState pagingState, int protocolVersion)
 +        {
 +            if (commands.size() == 1)
 +                return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
 +
 +            return new MultiPartitionPager(this, pagingState, protocolVersion);
 +        }
 +
 +        public boolean selectsKey(DecoratedKey key)
 +        {
 +            return Iterables.any(commands, c -> c.selectsKey(key));
 +        }
 +
 +        public boolean selectsClustering(DecoratedKey key, Clustering clustering)
 +        {
 +            return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return commands.toString();
 +        }
 +    }
 +
 +    private static class Deserializer extends SelectionDeserializer
 +    {
 +        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
 +        throws IOException
 +        {
 +            DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize()));
 +            ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
 +            return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7976d65a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 9f31af1,fddf058..f0cdcf5
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1830,41 -1874,20 +1830,49 @@@ public abstract class SSTableReader ext
          return sstableMetadata.maxTimestamp;
      }
  
 -    public Set<Integer> getAncestors()
 +    public int getMinLocalDeletionTime()
      {
 -        try
 -        {
 -            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 -            if (compactionMetadata != null)
 -                return compactionMetadata.ancestors;
 -            return Collections.emptySet();
 -        }
 -        catch (IOException e)
 -        {
 -            SSTableReader.logOpenException(descriptor, e);
 -            return Collections.emptySet();
 -        }
 +        return sstableMetadata.minLocalDeletionTime;
 +    }
 +
 +    public int getMaxLocalDeletionTime()
 +    {
 +        return sstableMetadata.maxLocalDeletionTime;
 +    }
 +
++    /** sstable contains no tombstones if minLocalDeletionTime == Integer.MAX_VALUE */
++    public boolean hasTombstones()
++    {
++        // sstable contains no tombstone if minLocalDeletionTime is still set to  the default value Integer.MAX_VALUE
++        // which is bigger than any valid deletion times
++        return getMinLocalDeletionTime() != Integer.MAX_VALUE;
++    }
++
 +    public int getMinTTL()
 +    {
 +        return sstableMetadata.minTTL;
 +    }
 +
 +    public int getMaxTTL()
 +    {
 +        return sstableMetadata.maxTTL;
 +    }
 +
 +    public long getTotalColumnsSet()
 +    {
 +        return sstableMetadata.totalColumnsSet;
 +    }
 +
 +    public long getTotalRows()
 +    {
 +        return sstableMetadata.totalRows;
 +    }
 +
 +    public int getAvgColumnSetPerRow()
 +    {
 +        return sstableMetadata.totalRows < 0
 +             ? -1
 +             : (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
      }
  
      public int getSSTableLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7976d65a/test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java
index 0000000,0000000..1c891ec
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java
@@@ -1,0 -1,0 +1,41 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.db;
++
++import org.junit.Test;
++
++import org.apache.cassandra.cql3.CQLTester;
++import org.apache.cassandra.cql3.UntypedResultSet;
++import static org.junit.Assert.assertTrue;
++
++public class SinglePartitionReadCommandCQLTest extends CQLTester
++{
++    @Test
++    public void partitionLevelDeletionTest() throws Throwable
++    {
++        createTable("CREATE TABLE %s (bucket_id TEXT,name TEXT,data TEXT,PRIMARY KEY (bucket_id, name))");
++        execute("insert into %s (bucket_id, name, data) values ('8772618c9009cf8f5a5e0c18', 'test', 'hello')");
++        getCurrentColumnFamilyStore().forceBlockingFlush();
++        execute("insert into %s (bucket_id, name, data) values ('8772618c9009cf8f5a5e0c19', 'test2', 'hello');");
++        execute("delete from %s where bucket_id = '8772618c9009cf8f5a5e0c18'");
++        getCurrentColumnFamilyStore().forceBlockingFlush();
++        UntypedResultSet res = execute("select * from %s where bucket_id = '8772618c9009cf8f5a5e0c18' and name = 'test'");
++        assertTrue(res.isEmpty());
++    }
++}


Mime
View raw message