cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [4/6] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Fri, 14 Jul 2017 15:30:24 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index 94f43dc,0000000..48ec06a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -1,814 -1,0 +1,827 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.filter;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.transform.BasePartitions;
 +import org.apache.cassandra.db.transform.BaseRows;
 +import org.apache.cassandra.db.transform.StoppingTransformation;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +/**
 + * Object in charge of tracking if we have fetch enough data for a given query.
 + *
 + * The reason this is not just a simple integer is that Thrift and CQL3 count
 + * stuffs in different ways. This is what abstract those differences.
 + */
 +public abstract class DataLimits
 +{
 +    public static final Serializer serializer = new Serializer();
 +
 +    public static final int NO_LIMIT = Integer.MAX_VALUE;
 +
 +    public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
 +    {
 +        @Override
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return false;
 +        }
 +
 +        @Override
-         public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
++        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
++                                                  int nowInSec,
++                                                  boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return iter;
 +        }
 +
 +        @Override
-         public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec)
++        public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
++                                            int nowInSec,
++                                            boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return iter;
 +        }
 +    };
 +
 +    // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per
 +    // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
 +    public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
 +
 +    public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
 +
 +    public static DataLimits cqlLimits(int cqlRowLimit)
 +    {
 +        return new CQLLimits(cqlRowLimit);
 +    }
 +
 +    public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
 +    {
 +        return new CQLLimits(cqlRowLimit, perPartitionLimit);
 +    }
 +
 +    public static DataLimits distinctLimits(int cqlRowLimit)
 +    {
 +        return CQLLimits.distinct(cqlRowLimit);
 +    }
 +
 +    public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit)
 +    {
 +        return new ThriftLimits(partitionLimit, cellPerPartitionLimit);
 +    }
 +
 +    public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
 +    {
 +        return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
 +    }
 +
 +    public abstract Kind kind();
 +
 +    public abstract boolean isUnlimited();
 +    public abstract boolean isDistinct();
 +
 +    public abstract DataLimits forPaging(int pageSize);
 +    public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
 +
 +    public abstract DataLimits forShortReadRetry(int toFetch);
 +
-     public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec);
++    public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData);
 +
 +    /**
 +     * Returns a new {@code Counter} for this limits.
 +     *
 +     * @param nowInSec the current time in second (to decide what is expired or not).
 +     * @param assumeLiveData if true, the counter will assume that every row passed is live and won't
 +     * thus check for liveness, otherwise it will. This should be {@code true} when used on a
 +     * {@code RowIterator} (since it only returns live rows), false otherwise.
++     * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted
++     * as 1 valid row.
 +     * @return a new {@code Counter} for this limits.
 +     */
-     public abstract Counter newCounter(int nowInSec, boolean assumeLiveData);
++    public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData);
 +
 +    /**
 +     * The max number of results this limits enforces.
 +     * <p>
 +     * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for
 +     * thrift, it means cells.
 +     *
 +     * @return the maximum number of results this limits enforces.
 +     */
 +    public abstract int count();
 +
 +    public abstract int perPartitionCount();
 +
-     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
++    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
++                                              int nowInSec,
++                                              boolean countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, false).applyTo(iter);
++        return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
-     public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec)
++    public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
++                                        int nowInSec,
++                                        boolean countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, false).applyTo(iter);
++        return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
-     public PartitionIterator filter(PartitionIterator iter, int nowInSec)
++    public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData)
 +    {
-         return this.newCounter(nowInSec, true).applyTo(iter);
++        return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData).applyTo(iter);
 +    }
 +
 +    /**
 +     * Estimate the number of results (the definition of "results" will be rows for CQL queries
 +     * and partitions for thrift ones) that a full scan of the provided cfs would yield.
 +     */
 +    public abstract float estimateTotalResults(ColumnFamilyStore cfs);
 +
 +    public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
 +    {
 +        // false means we do not propagate our stop signals onto the iterator, we only count
 +        private boolean enforceLimits = true;
 +
 +        public Counter onlyCount()
 +        {
 +            this.enforceLimits = false;
 +            return this;
 +        }
 +
 +        public PartitionIterator applyTo(PartitionIterator partitions)
 +        {
 +            return Transformation.apply(partitions, this);
 +        }
 +
 +        public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions)
 +        {
 +            return Transformation.apply(partitions, this);
 +        }
 +
 +        public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
 +        {
 +            return (UnfilteredRowIterator) applyToPartition(partition);
 +        }
 +
 +        public RowIterator applyTo(RowIterator partition)
 +        {
 +            return (RowIterator) applyToPartition(partition);
 +        }
 +
 +        /**
 +         * The number of results counted.
 +         * <p>
 +         * Note that the definition of "results" should be the same that for {@link #count}.
 +         *
 +         * @return the number of results counted.
 +         */
 +        public abstract int counted();
 +        public abstract int countedInCurrentPartition();
 +
 +        public abstract boolean isDone();
 +        public abstract boolean isDoneForPartition();
 +
 +        @Override
 +        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
 +        {
 +            return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this)
 +                                                              : Transformation.apply((RowIterator) partition, this);
 +        }
 +
 +        // called before we process a given partition
 +        protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow);
 +
 +        @Override
 +        protected void attachTo(BasePartitions partitions)
 +        {
 +            if (enforceLimits)
 +                super.attachTo(partitions);
 +            if (isDone())
 +                stop();
 +        }
 +
 +        @Override
 +        protected void attachTo(BaseRows rows)
 +        {
 +            if (enforceLimits)
 +                super.attachTo(rows);
 +            applyToPartition(rows.partitionKey(), rows.staticRow());
 +            if (isDoneForPartition())
 +                stopInPartition();
 +        }
 +    }
 +
 +    /**
 +     * Limits used by CQL; this counts rows.
 +     */
 +    private static class CQLLimits extends DataLimits
 +    {
 +        protected final int rowLimit;
 +        protected final int perPartitionLimit;
 +
 +        // Whether the query is a distinct query or not.
 +        protected final boolean isDistinct;
 +
 +        private CQLLimits(int rowLimit)
 +        {
 +            this(rowLimit, NO_LIMIT);
 +        }
 +
 +        private CQLLimits(int rowLimit, int perPartitionLimit)
 +        {
 +            this(rowLimit, perPartitionLimit, false);
 +        }
 +
 +        private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct)
 +        {
 +            this.rowLimit = rowLimit;
 +            this.perPartitionLimit = perPartitionLimit;
 +            this.isDistinct = isDistinct;
 +        }
 +
 +        private static CQLLimits distinct(int rowLimit)
 +        {
 +            return new CQLLimits(rowLimit, 1, true);
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.CQL_LIMIT;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
 +        }
 +
 +        public boolean isDistinct()
 +        {
 +            return isDistinct;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
 +        }
 +
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining);
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So
 +            // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch
 +            // for both argument or just for perPartitionLimit with no limit on rowLimit).
 +            return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
 +        }
 +
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            // We want the number of row that are currently live. Getting that precise number forces
 +            // us to iterate the cached partition in general, but we can avoid that if:
 +            //   - The number of rows with at least one non-expiring cell is greater than what we ask,
 +            //     in which case we know we have enough live.
 +            //   - The number of rows is less than requested, in which case we  know we won't have enough.
 +            if (cached.rowsWithNonExpiringCells() >= rowLimit)
 +                return true;
 +
 +            if (cached.rowCount() < rowLimit)
 +                return false;
 +
 +            // Otherwise, we need to re-count
 +
-             DataLimits.Counter counter = newCounter(nowInSec, false);
++            DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData);
 +            try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
 +                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
 +            {
 +                // Consume the iterator until we've counted enough
 +                while (iter.hasNext())
 +                    iter.next();
 +                return counter.isDone();
 +            }
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +        {
-             return new CQLCounter(nowInSec, assumeLiveData);
++            return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
 +        }
 +
 +        public int count()
 +        {
 +            return rowLimit;
 +        }
 +
 +        public int perPartitionCount()
 +        {
 +            return perPartitionLimit;
 +        }
 +
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // TODO: we should start storing stats on the number of rows (instead of the number of cells, which
 +            // is what getMeanColumns returns)
 +            float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
 +            return rowsPerPartition * (cfs.estimateKeys());
 +        }
 +
 +        protected class CQLCounter extends Counter
 +        {
 +            protected final int nowInSec;
 +            protected final boolean assumeLiveData;
++            protected final boolean countPartitionsWithOnlyStaticData;
 +
 +            protected int rowCounted;
 +            protected int rowInCurrentPartition;
 +
 +            protected boolean hasLiveStaticRow;
 +
-             public CQLCounter(int nowInSec, boolean assumeLiveData)
++            public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +            {
 +                this.nowInSec = nowInSec;
 +                this.assumeLiveData = assumeLiveData;
++                this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
 +            {
 +                rowInCurrentPartition = 0;
 +                hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec));
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (assumeLiveData || row.hasLiveData(nowInSec))
 +                    incrementRowCount();
 +                return row;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                // Normally, we don't count static rows as from a CQL point of view, it will be merge with other
 +                // rows in the partition. However, if we only have the static row, it will be returned as one row
 +                // so count it.
-                 if (hasLiveStaticRow && rowInCurrentPartition == 0)
++                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0)
 +                    incrementRowCount();
 +                super.onPartitionClose();
 +            }
 +
 +            private void incrementRowCount()
 +            {
 +                if (++rowCounted >= rowLimit)
 +                    stop();
 +                if (++rowInCurrentPartition >= perPartitionLimit)
 +                    stopInPartition();
 +            }
 +
 +            public int counted()
 +            {
 +                return rowCounted;
 +            }
 +
 +            public int countedInCurrentPartition()
 +            {
 +                return rowInCurrentPartition;
 +            }
 +
 +            public boolean isDone()
 +            {
 +                return rowCounted >= rowLimit;
 +            }
 +
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || rowInCurrentPartition >= perPartitionLimit;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +
 +            if (rowLimit != NO_LIMIT)
 +            {
 +                sb.append("LIMIT ").append(rowLimit);
 +                if (perPartitionLimit != NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (perPartitionLimit != NO_LIMIT)
 +                sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
 +
 +            return sb.toString();
 +        }
 +    }
 +
 +    private static class CQLPagingLimits extends CQLLimits
 +    {
 +        private final ByteBuffer lastReturnedKey;
 +        private final int lastReturnedKeyRemaining;
 +
 +        public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            super(rowLimit, perPartitionLimit, isDistinct);
 +            this.lastReturnedKey = lastReturnedKey;
 +            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_PAGING_LIMIT;
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +        {
-             return new PagingAwareCounter(nowInSec, assumeLiveData);
++            return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
 +        }
 +
 +        private class PagingAwareCounter extends CQLCounter
 +        {
-             private PagingAwareCounter(int nowInSec, boolean assumeLiveData)
++            private PagingAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +            {
-                 super(nowInSec, assumeLiveData);
++                super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
 +            {
 +                if (partitionKey.getKey().equals(lastReturnedKey))
 +                {
 +                    rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining;
 +                    // lastReturnedKey is the last key for which we're returned rows in the first page.
 +                    // So, since we know we have returned rows, we know we have accounted for the static row
 +                    // if any already, so force hasLiveStaticRow to false so we make sure to not count it
 +                    // once more.
 +                    hasLiveStaticRow = false;
 +                }
 +                else
 +                {
 +                    super.applyToPartition(partitionKey, staticRow);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Limits used by thrift; this count partition and cells.
 +     */
 +    private static class ThriftLimits extends DataLimits
 +    {
 +        protected final int partitionLimit;
 +        protected final int cellPerPartitionLimit;
 +
 +        private ThriftLimits(int partitionLimit, int cellPerPartitionLimit)
 +        {
 +            this.partitionLimit = partitionLimit;
 +            this.cellPerPartitionLimit = cellPerPartitionLimit;
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.THRIFT_LIMIT;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
 +        }
 +
 +        public boolean isDistinct()
 +        {
 +            return false;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            // We don't support paging on thrift in general but do use paging under the hood for get_count. For
 +            // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
 +            // partition). We do check that the partition limit is 1 however to make sure this is not misused
 +            // (as this wouldn't work properly for range queries).
 +            assert partitionLimit == 1;
 +            return new ThriftLimits(partitionLimit, pageSize);
 +        }
 +
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // Short read retries are always done for a single partition at a time, so it's ok to ignore the
 +            // partition limit for those
 +            return new ThriftLimits(1, toFetch);
 +        }
 +
-         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
++        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            // We want the number of cells that are currently live. Getting that precise number forces
 +            // us to iterate the cached partition in general, but we can avoid that if:
 +            //   - The number of non-expiring live cells is greater than the number of cells asked (we then
 +            //     know we have enough live cells).
 +            //   - The number of cells cached is less than requested, in which case we know we won't have enough.
 +            if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit)
 +                return true;
 +
 +            if (cached.nonTombstoneCellCount() < cellPerPartitionLimit)
 +                return false;
 +
 +            // Otherwise, we need to re-count
-             DataLimits.Counter counter = newCounter(nowInSec, false);
++            DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData);
 +            try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
 +                 UnfilteredRowIterator iter = counter.applyTo(cacheIter))
 +            {
 +                // Consume the iterator until we've counted enough
 +                while (iter.hasNext())
 +                    iter.next();
 +                return counter.isDone();
 +            }
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return new ThriftCounter(nowInSec, assumeLiveData);
 +        }
 +
 +        public int count()
 +        {
 +            return partitionLimit * cellPerPartitionLimit;
 +        }
 +
 +        public int perPartitionCount()
 +        {
 +            return cellPerPartitionLimit;
 +        }
 +
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // remember that getMeansColumns returns a number of cells: we should clean nomenclature
 +            float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
 +            return cellsPerPartition * cfs.estimateKeys();
 +        }
 +
 +        protected class ThriftCounter extends Counter
 +        {
 +            protected final int nowInSec;
 +            protected final boolean assumeLiveData;
 +
 +            protected int partitionsCounted;
 +            protected int cellsCounted;
 +            protected int cellsInCurrentPartition;
 +
 +            public ThriftCounter(int nowInSec, boolean assumeLiveData)
 +            {
 +                this.nowInSec = nowInSec;
 +                this.assumeLiveData = assumeLiveData;
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
 +            {
 +                cellsInCurrentPartition = 0;
 +                if (!staticRow.isEmpty())
 +                    applyToRow(staticRow);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                for (Cell cell : row.cells())
 +                {
 +                    if (assumeLiveData || cell.isLive(nowInSec))
 +                    {
 +                        ++cellsCounted;
 +                        if (++cellsInCurrentPartition >= cellPerPartitionLimit)
 +                            stopInPartition();
 +                    }
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                if (++partitionsCounted >= partitionLimit)
 +                    stop();
 +                super.onPartitionClose();
 +            }
 +
 +            public int counted()
 +            {
 +                return cellsCounted;
 +            }
 +
 +            public int countedInCurrentPartition()
 +            {
 +                return cellsInCurrentPartition;
 +            }
 +
 +            public boolean isDone()
 +            {
 +                return partitionsCounted >= partitionLimit;
 +            }
 +
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            // This is not valid CQL, but that's ok since it's not used for CQL queries.
 +            return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit);
 +        }
 +    }
 +
 +    /**
 +     * Limits used for thrift get_count when we only want to count super columns.
 +     */
 +    private static class SuperColumnCountingLimits extends ThriftLimits
 +    {
 +        private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
 +        {
 +            super(partitionLimit, cellPerPartitionLimit);
 +        }
 +
 +        public Kind kind()
 +        {
 +            return Kind.SUPER_COLUMN_COUNTING_LIMIT;
 +        }
 +
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            // We don't support paging on thrift in general but do use paging under the hood for get_count. For
 +            // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
 +            // partition). We do check that the partition limit is 1 however to make sure this is not misused
 +            // (as this wouldn't work properly for range queries).
 +            assert partitionLimit == 1;
 +            return new SuperColumnCountingLimits(partitionLimit, pageSize);
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            // Short read retries are always done for a single partition at a time, so it's ok to ignore the
 +            // partition limit for those
 +            return new SuperColumnCountingLimits(1, toFetch);
 +        }
 +
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        @Override
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +        {
 +            return new SuperColumnCountingCounter(nowInSec, assumeLiveData);
 +        }
 +
 +        protected class SuperColumnCountingCounter extends ThriftCounter
 +        {
 +            public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData)
 +            {
 +                super(nowInSec, assumeLiveData);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                // In the internal format, a row == a super column, so that's what we want to count.
 +                if (assumeLiveData || row.hasLiveData(nowInSec))
 +                {
 +                    ++cellsCounted;
 +                    if (++cellsInCurrentPartition >= cellPerPartitionLimit)
 +                        stopInPartition();
 +                }
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static class Serializer
 +    {
 +        public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeByte(limits.kind().ordinal());
 +            switch (limits.kind())
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    CQLLimits cqlLimits = (CQLLimits)limits;
 +                    out.writeUnsignedVInt(cqlLimits.rowLimit);
 +                    out.writeUnsignedVInt(cqlLimits.perPartitionLimit);
 +                    out.writeBoolean(cqlLimits.isDistinct);
 +                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
 +                    {
 +                        CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
 +                        ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
 +                        out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
 +                    }
 +                    break;
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    ThriftLimits thriftLimits = (ThriftLimits)limits;
 +                    out.writeUnsignedVInt(thriftLimits.partitionLimit);
 +                    out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit);
 +                    break;
 +            }
 +        }
 +
 +        public DataLimits deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            Kind kind = Kind.values()[in.readUnsignedByte()];
 +            switch (kind)
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    int rowLimit = (int)in.readUnsignedVInt();
 +                    int perPartitionLimit = (int)in.readUnsignedVInt();
 +                    boolean isDistinct = in.readBoolean();
 +                    if (kind == Kind.CQL_LIMIT)
 +                        return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
 +
 +                    ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
 +                    int lastRemaining = (int)in.readUnsignedVInt();
 +                    return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining);
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    int partitionLimit = (int)in.readUnsignedVInt();
 +                    int cellPerPartitionLimit = (int)in.readUnsignedVInt();
 +                    return kind == Kind.THRIFT_LIMIT
 +                         ? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
 +                         : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
 +            }
 +            throw new AssertionError();
 +        }
 +
 +        public long serializedSize(DataLimits limits, int version)
 +        {
 +            long size = TypeSizes.sizeof((byte)limits.kind().ordinal());
 +            switch (limits.kind())
 +            {
 +                case CQL_LIMIT:
 +                case CQL_PAGING_LIMIT:
 +                    CQLLimits cqlLimits = (CQLLimits)limits;
 +                    size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
 +                    size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
 +                    size += TypeSizes.sizeof(cqlLimits.isDistinct);
 +                    if (limits.kind() == Kind.CQL_PAGING_LIMIT)
 +                    {
 +                        CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
 +                        size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
 +                        size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
 +                    }
 +                    break;
 +                case THRIFT_LIMIT:
 +                case SUPER_COLUMN_COUNTING_LIMIT:
 +                    ThriftLimits thriftLimits = (ThriftLimits)limits;
 +                    size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
 +                    size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
 +                    break;
 +                default:
 +                    throw new AssertionError();
 +            }
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/RowFilter.java
index 5ffe2ab,0000000..8d11038
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@@ -1,994 -1,0 +1,1009 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.filter;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Objects;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.context.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +
 +/**
 + * A filter on which rows a given query should include or exclude.
 + * <p>
 + * This corresponds to the restrictions on rows that are not handled by the query
 + * {@link ClusteringIndexFilter}. Some of the expressions of this filter may
 + * be handled by a 2ndary index, and the rest is simply filtered out from the
 + * result set (the later can only happen if the query was using ALLOW FILTERING).
 + */
 +public abstract class RowFilter implements Iterable<RowFilter.Expression>
 +{
 +    public static final Serializer serializer = new Serializer();
 +    public static final RowFilter NONE = new CQLFilter(Collections.emptyList());
 +
 +    protected final List<Expression> expressions;
 +
 +    protected RowFilter(List<Expression> expressions)
 +    {
 +        this.expressions = expressions;
 +    }
 +
 +    public static RowFilter create()
 +    {
 +        return new CQLFilter(new ArrayList<>());
 +    }
 +
 +    public static RowFilter create(int capacity)
 +    {
 +        return new CQLFilter(new ArrayList<>(capacity));
 +    }
 +
 +    public static RowFilter forThrift(int capacity)
 +    {
 +        return new ThriftFilter(new ArrayList<>(capacity));
 +    }
 +
 +    public void add(ColumnDefinition def, Operator op, ByteBuffer value)
 +    {
 +        add(new SimpleExpression(def, op, value));
 +    }
 +
 +    public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value)
 +    {
 +        add(new MapEqualityExpression(def, key, op, value));
 +    }
 +
 +    public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value)
 +    {
 +        assert (this instanceof ThriftFilter);
 +        add(new ThriftExpression(metadata, name, op, value));
 +    }
 +
 +    public void addCustomIndexExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value)
 +    {
 +        add(new CustomExpression(cfm, targetIndex, value));
 +    }
 +
 +    private void add(Expression expression)
 +    {
 +        expression.validate();
 +        expressions.add(expression);
 +    }
 +
 +    public List<Expression> getExpressions()
 +    {
 +        return expressions;
 +    }
 +
 +    /**
++     * Checks if some of the expressions apply to clustering or regular columns.
++     * @return {@code true} if some of the expressions apply to clustering or regular columns, {@code false} otherwise.
++     */
++    public boolean hasExpressionOnClusteringOrRegularColumns()
++    {
++        for (Expression expression : expressions)
++        {
++            ColumnDefinition column = expression.column();
++            if (column.isClusteringColumn() || column.isRegular())
++                return true;
++        }
++        return false;
++    }
++
++    /**
 +     * Filters the provided iterator so that only the row satisfying the expression of this filter
 +     * are included in the resulting iterator.
 +     *
 +     * @param iter the iterator to filter
 +     * @param nowInSec the time of query in seconds.
 +     * @return the filtered iterator.
 +     */
 +    public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
 +
 +    /**
 +     * Whether the provided row in the provided partition satisfies this filter.
 +     *
 +     * @param metadata the table metadata.
 +     * @param partitionKey the partition key for partition to test.
 +     * @param row the row to test.
 +     * @param nowInSec the current time in seconds (to know what is live and what isn't).
 +     * @return {@code true} if {@code row} in partition {@code partitionKey} satisfies this row filter.
 +     */
 +    public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row, int nowInSec)
 +    {
 +        // We purge all tombstones as the expressions isSatisfiedBy methods expects it
 +        Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
 +        if (purged == null)
 +            return expressions.isEmpty();
 +
 +        for (Expression e : expressions)
 +        {
 +            if (!e.isSatisfiedBy(metadata, partitionKey, purged))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by
 +     * the given key, false otherwise.
 +     */
 +    public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, AbstractType<?> keyValidator)
 +    {
 +        for (Expression e : expressions)
 +        {
 +            if (!e.column.isPartitionKey())
 +                continue;
 +
 +            ByteBuffer value = keyValidator instanceof CompositeType
 +                             ? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()]
 +                             : key.getKey();
 +            if (!e.operator().isSatisfiedBy(e.column.type, value, e.value))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns true if all of the expressions within this filter that apply to the clustering key are satisfied by
 +     * the given Clustering, false otherwise.
 +     */
 +    public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering clustering)
 +    {
 +        for (Expression e : expressions)
 +        {
 +            if (!e.column.isClusteringColumn())
 +                continue;
 +
 +            if (!e.operator().isSatisfiedBy(e.column.type, clustering.get(e.column.position()), e.value))
 +            {
 +                return false;
 +            }
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Returns this filter but without the provided expression. This method
 +     * *assumes* that the filter contains the provided expression.
 +     */
 +    public RowFilter without(Expression expression)
 +    {
 +        assert expressions.contains(expression);
 +        if (expressions.size() == 1)
 +            return RowFilter.NONE;
 +
 +        List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1);
 +        for (Expression e : expressions)
 +            if (!e.equals(expression))
 +                newExpressions.add(e);
 +
 +        return withNewExpressions(newExpressions);
 +    }
 +
 +    protected abstract RowFilter withNewExpressions(List<Expression> expressions);
 +
 +    public boolean isEmpty()
 +    {
 +        return expressions.isEmpty();
 +    }
 +
 +    public Iterator<Expression> iterator()
 +    {
 +        return expressions.iterator();
 +    }
 +
 +    private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name)
 +    {
 +        assert metadata.isCompactTable();
 +        if (metadata.isCompound())
 +        {
 +            List<ByteBuffer> values = CompositeType.splitName(name);
 +            return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
 +        }
 +        else
 +        {
 +            return new Clustering(name);
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        for (int i = 0; i < expressions.size(); i++)
 +        {
 +            if (i > 0)
 +                sb.append(" AND ");
 +            sb.append(expressions.get(i));
 +        }
 +        return sb.toString();
 +    }
 +
 +    private static class CQLFilter extends RowFilter
 +    {
 +        private CQLFilter(List<Expression> expressions)
 +        {
 +            super(expressions);
 +        }
 +
 +        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
 +        {
 +            if (expressions.isEmpty())
 +                return iter;
 +
 +            final CFMetaData metadata = iter.metadata();
 +            long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count();
 +            final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0;
 +            final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0;
 +
 +            class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator>
 +            {
 +                DecoratedKey pk;
 +                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +                {
 +                    // The filter might be on static columns, so need to check static row first.
 +                    if (filterStaticColumns && applyToRow(partition.staticRow()) == null)
 +                        return null;
 +
 +                    pk = partition.partitionKey();
 +                    UnfilteredRowIterator iterator = Transformation.apply(partition, this);
 +
 +                    return (filterNonStaticColumns && !iterator.hasNext()) ? null : iterator;
 +                }
 +
 +                public Row applyToRow(Row row)
 +                {
 +                    Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
 +                    if (purged == null)
 +                        return null;
 +
 +                    for (Expression e : expressions)
 +                        if (!e.isSatisfiedBy(metadata, pk, purged))
 +                            return null;
 +                    return row;
 +                }
 +            }
 +
 +            return Transformation.apply(iter, new IsSatisfiedFilter());
 +        }
 +
 +        protected RowFilter withNewExpressions(List<Expression> expressions)
 +        {
 +            return new CQLFilter(expressions);
 +        }
 +    }
 +
 +    private static class ThriftFilter extends RowFilter
 +    {
 +        private ThriftFilter(List<Expression> expressions)
 +        {
 +            super(expressions);
 +        }
 +
 +        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
 +        {
 +            if (expressions.isEmpty())
 +                return iter;
 +
 +            class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator>
 +            {
 +                @Override
 +                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +                {
 +                    // Thrift does not filter rows, it filters entire partition if any of the expression is not
 +                    // satisfied, which forces us to materialize the result (in theory we could materialize only
 +                    // what we need which might or might not be everything, but we keep it simple since in practice
 +                    // it's not worth that it has ever been).
 +                    ImmutableBTreePartition result = ImmutableBTreePartition.create(iter);
 +                    iter.close();
 +
 +                    // The partition needs to have a row for every expression, and the expression needs to be valid.
 +                    for (Expression expr : expressions)
 +                    {
 +                        assert expr instanceof ThriftExpression;
 +                        Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
 +                        if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row))
 +                            return null;
 +                    }
 +                    // If we get there, it means all expressions where satisfied, so return the original result
 +                    return result.unfilteredIterator();
 +                }
 +            }
 +            return Transformation.apply(iter, new IsSatisfiedThriftFilter());
 +        }
 +
 +        protected RowFilter withNewExpressions(List<Expression> expressions)
 +        {
 +            return new ThriftFilter(expressions);
 +        }
 +    }
 +
 +    public static abstract class Expression
 +    {
 +        private static final Serializer serializer = new Serializer();
 +
 +        // Note: the order of this enum matter, it's used for serialization
 +        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM }
 +
 +        abstract Kind kind();
 +        protected final ColumnDefinition column;
 +        protected final Operator operator;
 +        protected final ByteBuffer value;
 +
 +        protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value)
 +        {
 +            this.column = column;
 +            this.operator = operator;
 +            this.value = value;
 +        }
 +
 +        public boolean isCustom()
 +        {
 +            return kind() == Kind.CUSTOM;
 +        }
 +
 +        public ColumnDefinition column()
 +        {
 +            return column;
 +        }
 +
 +        public Operator operator()
 +        {
 +            return operator;
 +        }
 +
 +        /**
 +         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
 +         *
 +         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
 +         * operator, <code>false</code> otherwise.
 +         */
 +        public boolean isContains()
 +        {
 +            return Operator.CONTAINS == operator;
 +        }
 +
 +        /**
 +         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
 +         *
 +         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
 +         * operator, <code>false</code> otherwise.
 +         */
 +        public boolean isContainsKey()
 +        {
 +            return Operator.CONTAINS_KEY == operator;
 +        }
 +
 +        /**
 +         * If this expression is used to query an index, the value to use as
 +         * partition key for that index query.
 +         */
 +        public ByteBuffer getIndexValue()
 +        {
 +            return value;
 +        }
 +
 +        public void validate()
 +        {
 +            checkNotNull(value, "Unsupported null value for column %s", column.name);
 +            checkBindValueSet(value, "Unsupported unset value for column %s", column.name);
 +        }
 +
 +        @Deprecated
 +        public void validateForIndexing()
 +        {
 +            checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
 +                       "Index expression values may not be larger than 64K");
 +        }
 +
 +        /**
 +         * Returns whether the provided row satisfied this expression or not.
 +         *
 +         * @param partitionKey the partition key for row to check.
 +         * @param row the row to check. It should *not* contain deleted cells
 +         * (i.e. it should come from a RowIterator).
 +         * @return whether the row is satisfied by this expression.
 +         */
 +        public abstract boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row);
 +
 +        protected ByteBuffer getValue(CFMetaData metadata, DecoratedKey partitionKey, Row row)
 +        {
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    return metadata.getKeyValidator() instanceof CompositeType
 +                         ? CompositeType.extractComponent(partitionKey.getKey(), column.position())
 +                         : partitionKey.getKey();
 +                case CLUSTERING:
 +                    return row.clustering().get(column.position());
 +                default:
 +                    Cell cell = row.getCell(column);
 +                    return cell == null ? null : cell.value();
 +            }
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o)
 +                return true;
 +
 +            if (!(o instanceof Expression))
 +                return false;
 +
 +            Expression that = (Expression)o;
 +
 +            return Objects.equal(this.kind(), that.kind())
 +                && Objects.equal(this.column.name, that.column.name)
 +                && Objects.equal(this.operator, that.operator)
 +                && Objects.equal(this.value, that.value);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hashCode(column.name, operator, value);
 +        }
 +
 +        private static class Serializer
 +        {
 +            public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
 +            {
 +                if (version >= MessagingService.VERSION_30)
 +                    out.writeByte(expression.kind().ordinal());
 +
 +                // Custom expressions include neither a column or operator, but all
 +                // other expressions do. Also, custom expressions are 3.0+ only, so
 +                // the column & operator will always be the first things written for
 +                // any pre-3.0 version
 +                if (expression.kind() == Kind.CUSTOM)
 +                {
 +                    assert version >= MessagingService.VERSION_30;
 +                    IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version);
 +                    ByteBufferUtil.writeWithShortLength(expression.value, out);
 +                    return;
 +                }
 +
 +                ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
 +                expression.operator.writeTo(out);
 +
 +                switch (expression.kind())
 +                {
 +                    case SIMPLE:
 +                        ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out);
 +                        break;
 +                    case MAP_EQUALITY:
 +                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
 +                        if (version < MessagingService.VERSION_30)
 +                        {
 +                            ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
 +                        }
 +                        else
 +                        {
 +                            ByteBufferUtil.writeWithShortLength(mexpr.key, out);
 +                            ByteBufferUtil.writeWithShortLength(mexpr.value, out);
 +                        }
 +                        break;
 +                    case THRIFT_DYN_EXPR:
 +                        ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
 +                        break;
 +                }
 +            }
 +
 +            public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
 +            {
 +                Kind kind = null;
 +                ByteBuffer name;
 +                Operator operator;
 +                ColumnDefinition column;
 +
 +                if (version >= MessagingService.VERSION_30)
 +                {
 +                    kind = Kind.values()[in.readByte()];
 +                    // custom expressions (3.0+ only) do not contain a column or operator, only a value
 +                    if (kind == Kind.CUSTOM)
 +                    {
 +                        return new CustomExpression(metadata,
 +                                                    IndexMetadata.serializer.deserialize(in, version, metadata),
 +                                                    ByteBufferUtil.readWithShortLength(in));
 +                    }
 +                }
 +
 +                name = ByteBufferUtil.readWithShortLength(in);
 +                operator = Operator.readFrom(in);
 +                column = metadata.getColumnDefinition(name);
 +                if (!metadata.isCompactTable() && column == null)
 +                    throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");
 +
 +                if (version < MessagingService.VERSION_30)
 +                {
 +                    if (column == null)
 +                        kind = Kind.THRIFT_DYN_EXPR;
 +                    else if (column.type instanceof MapType && operator == Operator.EQ)
 +                        kind = Kind.MAP_EQUALITY;
 +                    else
 +                        kind = Kind.SIMPLE;
 +                }
 +
 +                assert kind != null;
 +                switch (kind)
 +                {
 +                    case SIMPLE:
 +                        return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
 +                    case MAP_EQUALITY:
 +                        ByteBuffer key, value;
 +                        if (version < MessagingService.VERSION_30)
 +                        {
 +                            ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
 +                            key = CompositeType.extractComponent(composite, 0);
 +                            value = CompositeType.extractComponent(composite, 0);
 +                        }
 +                        else
 +                        {
 +                            key = ByteBufferUtil.readWithShortLength(in);
 +                            value = ByteBufferUtil.readWithShortLength(in);
 +                        }
 +                        return new MapEqualityExpression(column, key, operator, value);
 +                    case THRIFT_DYN_EXPR:
 +                        return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
 +                }
 +                throw new AssertionError();
 +            }
 +
 +
 +            public long serializedSize(Expression expression, int version)
 +            {
 +                // version 3.0+ includes a byte for Kind
 +                long size = version >= MessagingService.VERSION_30 ? 1 : 0;
 +
 +                // custom expressions don't include a column or operator, all other expressions do
 +                if (expression.kind() != Kind.CUSTOM)
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
 +                            + expression.operator.serializedSize();
 +
 +                switch (expression.kind())
 +                {
 +                    case SIMPLE:
 +                        size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value);
 +                        break;
 +                    case MAP_EQUALITY:
 +                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
 +                        if (version < MessagingService.VERSION_30)
 +                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue());
 +                        else
 +                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
 +                                  + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
 +                        break;
 +                    case THRIFT_DYN_EXPR:
 +                        size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value);
 +                        break;
 +                    case CUSTOM:
 +                        if (version >= MessagingService.VERSION_30)
 +                            size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
 +                                  + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
 +                        break;
 +                }
 +                return size;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'column' 'op' 'value'.
 +     */
 +    private static class SimpleExpression extends Expression
 +    {
 +        public SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
 +        {
 +            super(column, operator, value);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
 +        {
 +            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
 +            // TODO: we should try to merge both code someday.
 +            assert value != null;
 +
 +            if (row.isStatic() != column.isStatic())
 +                return true;
 +
 +            switch (operator)
 +            {
 +                case EQ:
 +                case LT:
 +                case LTE:
 +                case GTE:
 +                case GT:
 +                    {
 +                        assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
 +
 +                        // In order to support operators on Counter types, their value has to be extracted from internal
 +                        // representation. See CASSANDRA-11629
 +                        if (column.type.isCounter())
 +                        {
 +                            ByteBuffer foundValue = getValue(metadata, partitionKey, row);
 +                            if (foundValue == null)
 +                                return false;
 +
 +                            ByteBuffer counterValue = LongType.instance.decompose(CounterContext.instance().total(foundValue));
 +                            return operator.isSatisfiedBy(LongType.instance, counterValue, value);
 +                        }
 +                        else
 +                        {
 +                            // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
 +                            ByteBuffer foundValue = getValue(metadata, partitionKey, row);
 +                            return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
 +                        }
 +                    }
 +                case NEQ:
 +                    {
 +                        assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
 +                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
 +                        // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
 +                        return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
 +                    }
 +                case CONTAINS:
 +                    assert column.type.isCollection();
 +                    CollectionType<?> type = (CollectionType<?>)column.type;
 +                    if (column.isComplex())
 +                    {
 +                        ComplexColumnData complexData = row.getComplexColumnData(column);
 +                        if (complexData != null)
 +                        {
 +                            for (Cell cell : complexData)
 +                            {
 +                                if (type.kind == CollectionType.Kind.SET)
 +                                {
 +                                    if (type.nameComparator().compare(cell.path().get(0), value) == 0)
 +                                        return true;
 +                                }
 +                                else
 +                                {
 +                                    if (type.valueComparator().compare(cell.value(), value) == 0)
 +                                        return true;
 +                                }
 +                            }
 +                        }
 +                        return false;
 +                    }
 +                    else
 +                    {
 +                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
 +                        if (foundValue == null)
 +                            return false;
 +
 +                        switch (type.kind)
 +                        {
 +                            case LIST:
 +                                ListType<?> listType = (ListType<?>)type;
 +                                return listType.compose(foundValue).contains(listType.getElementsType().compose(value));
 +                            case SET:
 +                                SetType<?> setType = (SetType<?>)type;
 +                                return setType.compose(foundValue).contains(setType.getElementsType().compose(value));
 +                            case MAP:
 +                                MapType<?,?> mapType = (MapType<?, ?>)type;
 +                                return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value));
 +                        }
 +                        throw new AssertionError();
 +                    }
 +                case CONTAINS_KEY:
 +                    assert column.type.isCollection() && column.type instanceof MapType;
 +                    MapType<?, ?> mapType = (MapType<?, ?>)column.type;
 +                    if (column.isComplex())
 +                    {
 +                         return row.getCell(column, CellPath.create(value)) != null;
 +                    }
 +                    else
 +                    {
 +                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
 +                        return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null;
 +                    }
 +
 +                case IN:
 +                    // It wouldn't be terribly hard to support this (though doing so would imply supporting
 +                    // IN for 2ndary index) but currently we don't.
 +                    throw new AssertionError();
 +            }
 +            throw new AssertionError();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            AbstractType<?> type = column.type;
 +            switch (operator)
 +            {
 +                case CONTAINS:
 +                    assert type instanceof CollectionType;
 +                    CollectionType<?> ct = (CollectionType<?>)type;
 +                    type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator();
 +                    break;
 +                case CONTAINS_KEY:
 +                    assert type instanceof MapType;
 +                    type = ((MapType<?, ?>)type).nameComparator();
 +                    break;
 +                case IN:
 +                    type = ListType.getInstance(type, false);
 +                    break;
 +                default:
 +                    break;
 +            }
 +            return String.format("%s %s %s", column.name, operator, type.getString(value));
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.SIMPLE;
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'column' ['key'] = 'value' (which is only
 +     * supported when 'column' is a map).
 +     */
 +    private static class MapEqualityExpression extends Expression
 +    {
 +        private final ByteBuffer key;
 +
 +        public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value)
 +        {
 +            super(column, operator, value);
 +            assert column.type instanceof MapType && operator == Operator.EQ;
 +            this.key = key;
 +        }
 +
 +        @Override
 +        public void validate() throws InvalidRequestException
 +        {
 +            checkNotNull(key, "Unsupported null map key for column %s", column.name);
 +            checkBindValueSet(key, "Unsupported unset map key for column %s", column.name);
 +            checkNotNull(value, "Unsupported null map value for column %s", column.name);
 +            checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
 +        }
 +
 +        @Override
 +        public ByteBuffer getIndexValue()
 +        {
 +            return CompositeType.build(key, value);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
 +        {
 +            assert key != null;
 +            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
 +            // TODO: we should try to merge both code someday.
 +            assert value != null;
 +
 +            if (row.isStatic() != column.isStatic())
 +                return true;
 +
 +            MapType<?, ?> mt = (MapType<?, ?>)column.type;
 +            if (column.isComplex())
 +            {
 +                Cell cell = row.getCell(column, CellPath.create(key));
 +                return cell != null && mt.valueComparator().compare(cell.value(), value) == 0;
 +            }
 +            else
 +            {
 +                ByteBuffer serializedMap = getValue(metadata, partitionKey, row);
 +                if (serializedMap == null)
 +                    return false;
 +
 +                ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType());
 +                return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            MapType<?, ?> mt = (MapType<?, ?>)column.type;
 +            return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value));
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o)
 +                return true;
 +
 +            if (!(o instanceof MapEqualityExpression))
 +                return false;
 +
 +            MapEqualityExpression that = (MapEqualityExpression)o;
 +
 +            return Objects.equal(this.column.name, that.column.name)
 +                && Objects.equal(this.operator, that.operator)
 +                && Objects.equal(this.key, that.key)
 +                && Objects.equal(this.value, that.value);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hashCode(column.name, operator, key, value);
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.MAP_EQUALITY;
 +        }
 +    }
 +
 +    /**
 +     * An expression of the form 'name' = 'value', but where 'name' is actually the
 +     * clustering value for a compact table. This is only for thrift.
 +     */
 +    private static class ThriftExpression extends Expression
 +    {
 +        public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value)
 +        {
 +            super(makeDefinition(metadata, name), operator, value);
 +            assert metadata.isCompactTable();
 +        }
 +
 +        private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name)
 +        {
 +            ColumnDefinition def = metadata.getColumnDefinition(name);
 +            if (def != null)
 +                return def;
 +
 +            // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
 +            // this we create a "fake" definition. This is messy but it works so is probably good enough.
 +            return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type);
 +        }
 +
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
 +        {
 +            assert value != null;
 +
 +            // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic"
 +            // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only
 +            // called when the row clustering does match the column (see ThriftFilter above).
 +            assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes));
 +            Cell cell = row.getCell(metadata.compactValueColumn());
 +            return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s %s %s", column.name, operator, column.type.getString(value));
 +        }
 +
 +        @Override
 +        Kind kind()
 +        {
 +            return Kind.THRIFT_DYN_EXPR;
 +        }
 +    }
 +
 +    /**
 +     * A custom index expression for use with 2i implementations which support custom syntax and which are not
 +     * necessarily linked to a single column in the base table.
 +     */
 +    public static final class CustomExpression extends Expression
 +    {
 +        private final IndexMetadata targetIndex;
 +        private final CFMetaData cfm;
 +
 +        public CustomExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value)
 +        {
 +            // The operator is not relevant, but Expression requires it so for now we just hardcode EQ
 +            super(makeDefinition(cfm, targetIndex), Operator.EQ, value);
 +            this.targetIndex = targetIndex;
 +            this.cfm = cfm;
 +        }
 +
 +        private static ColumnDefinition makeDefinition(CFMetaData cfm, IndexMetadata index)
 +        {
 +            // Similarly to how we handle non-defined columns in thift, we create a fake column definition to
 +            // represent the target index. This is definitely something that can be improved though.
 +            return ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(index.name.getBytes()), BytesType.instance);
 +        }
 +
 +        public IndexMetadata getTargetIndex()
 +        {
 +            return targetIndex;
 +        }
 +
 +        public ByteBuffer getValue()
 +        {
 +            return value;
 +        }
 +
 +        public String toString()
 +        {
 +            return String.format("expr(%s, %s)",
 +                                 targetIndex.name,
 +                                 Keyspace.openAndGetStore(cfm)
 +                                         .indexManager
 +                                         .getIndex(targetIndex)
 +                                         .customExpressionValueType());
 +        }
 +
 +        Kind kind()
 +        {
 +            return Kind.CUSTOM;
 +        }
 +
 +        // Filtering by custom expressions isn't supported yet, so just accept any row
 +        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
 +        {
 +            return true;
 +        }
 +    }
 +
 +    public static class Serializer
 +    {
 +        public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeBoolean(filter instanceof ThriftFilter);
 +            out.writeUnsignedVInt(filter.expressions.size());
 +            for (Expression expr : filter.expressions)
 +                Expression.serializer.serialize(expr, out, version);
 +
 +        }
 +
 +        public RowFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
 +        {
 +            boolean forThrift = in.readBoolean();
 +            int size = (int)in.readUnsignedVInt();
 +            List<Expression> expressions = new ArrayList<>(size);
 +            for (int i = 0; i < size; i++)
 +                expressions.add(Expression.serializer.deserialize(in, version, metadata));
 +
 +            return forThrift
 +                 ? new ThriftFilter(expressions)
 +                 : new CQLFilter(expressions);
 +        }
 +
 +        public long serializedSize(RowFilter filter, int version)
 +        {
 +            long size = 1 // forThrift
 +                      + TypeSizes.sizeofUnsignedVInt(filter.expressions.size());
 +            for (Expression expr : filter.expressions)
 +                size += Expression.serializer.serializedSize(expr, version);
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index c51a5d1,a13a52d..d23bdb0
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -439,13 -414,10 +439,13 @@@ public class CacheService implements Ca
              {
                  public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                  {
 -                    DecoratedKey key = cfs.partitioner.decorateKey(buffer);
 -                    QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
 -                    ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
 -                    return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
 +                    DecoratedKey key = cfs.decorateKey(buffer);
 +                    int nowInSec = FBUtilities.nowInSeconds();
 +                    try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
 +                    {
-                         CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
++                        CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec, true), nowInSec);
 +                        return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
 +                    }
                  }
              });
          }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message