cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [10/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0
Date Wed, 11 Nov 2015 22:18:20 GMT
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de23f20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de23f20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de23f20

Branch: refs/heads/trunk
Commit: 0de23f20ae4bd95f040017e2db653c6c1b5eabe9
Parents: 9a90e98 e487553
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed Nov 11 16:16:23 2015 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Nov 11 16:16:23 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 34 +++++++++++
 .../db/compaction/CompactionController.java     |  5 --
 src/java/org/apache/cassandra/dht/Bounds.java   | 62 ++++++++++++++++++++
 .../cassandra/streaming/StreamReader.java       | 12 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 37 +++++++++++-
 .../compress/CompressedStreamReader.java        |  2 +-
 .../apache/cassandra/db/CounterCacheTest.java   | 48 +++++++++++++++
 .../org/apache/cassandra/db/RowCacheTest.java   | 50 ++++++++++++++++
 .../org/apache/cassandra/dht/BoundsTest.java    | 61 +++++++++++++++++++
 10 files changed, 298 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d271c95,0fcf037..02dc249
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,6 +1,51 @@@
 -2.2.4
 +3.0.1
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 +Merged from 2.1:
++ * Invalidate cache after stream receive task is completed (CASSANDRA-10341)
 + * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
 + * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
 +
 +
 +3.0
 + * Fix AssertionError while flushing memtable due to materialized views
 +   incorrectly inserting empty rows (CASSANDRA-10614)
 + * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650)
 + * Don't use -1 for the position of partition key in schema (CASSANDRA-10491)
 + * Fix distinct queries in mixed version cluster (CASSANDRA-10573)
 + * Skip sstable on clustering in names query (CASSANDRA-10571)
 + * Remove value skipping as it breaks read-repair (CASSANDRA-10655)
 + * Fix bootstrapping with MVs (CASSANDRA-10621)
 + * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584)
 + * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634)
 + * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600)
 + * Fix reading of legacy sstables (CASSANDRA-10590)
 + * Use CQL type names in schema metadata tables (CASSANDRA-10365)
 + * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
 + * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
 + * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
 + * Don't use 'names query' read path for counters (CASSANDRA-10572)
 + * Fix backward compatibility for counters (CASSANDRA-10470)
 + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628)
 + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
 + * Fix thrift cas operations with defined columns (CASSANDRA-10576)
 + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
 + * Fix thrift get() queries with defined columns (CASSANDRA-10586)
 + * Fix marking of indexes as built and removed (CASSANDRA-10601)
 + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
 + * Fix batches on multiple tables (CASSANDRA-10554)
 + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
 + * Remove token generator (CASSANDRA-5261)
 + * RolesCache should not be created for any authenticator that does not requireAuthentication
(CASSANDRA-10562)
 + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
 + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
 + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
 +Merged from 2.2:
   * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
   * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
   * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0b838bf,2d58219..38c99ea
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1739,6 -2519,41 +1739,40 @@@ public class ColumnFamilyStore implemen
              CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
      }
  
+     public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate)
+     {
+         int invalidatedKeys = 0;
+         for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
+              keyIter.hasNext(); )
+         {
+             RowCacheKey key = keyIter.next();
 -            DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
++            DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
+             if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(),
boundsToInvalidate))
+             {
 -                invalidateCachedRow(dk);
++                invalidateCachedPartition(dk);
+                 invalidatedKeys++;
+             }
+         }
 -
+         return invalidatedKeys;
+     }
+ 
+     public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate)
+     {
+         int invalidatedKeys = 0;
+         for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
+              keyIter.hasNext(); )
+         {
+             CounterCacheKey key = keyIter.next();
 -            DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
++            DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey));
+             if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(),
boundsToInvalidate))
+             {
+                 CacheService.instance.counterCache.remove(key);
+                 invalidatedKeys++;
+             }
+         }
+         return invalidatedKeys;
+     }
+ 
      /**
       * @return true if @param key is contained in the row cache
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/Bounds.java
index d9c189d,73414cd..a125168
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@@ -17,10 -17,19 +17,19 @@@
   */
  package org.apache.cassandra.dht;
  
+ import java.util.ArrayList;
+ import java.util.Collection;
  import java.util.Collections;
+ import java.util.Comparator;
  import java.util.List;
+ import java.util.Set;
+ 
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.PeekingIterator;
+ import com.google.common.collect.Sets;
  
 -import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.PartitionPosition;
  import org.apache.cassandra.utils.Pair;
  
  /**
@@@ -102,16 -111,20 +111,30 @@@ public class Bounds<T extends RingPosit
          return "]";
      }
  
+     public static <T extends RingPosition<T>> boolean isInBounds(T token, Iterable<Bounds<T>>
bounds)
+     {
+         assert bounds != null;
+ 
+         for (Bounds<T> bound : bounds)
+         {
+             if (bound.contains(token))
+             {
+                 return true;
+             }
+         }
+         return false;
+     }
+ 
 +    public boolean isStartInclusive()
 +    {
 +        return true;
 +    }
 +
 +    public boolean isEndInclusive()
 +    {
 +        return true;
 +    }
 +
      /**
       * Compute a bounds of keys corresponding to a given bounds of token.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 6169494,fe3b13d..4a38d5b
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -106,7 -102,8 +106,7 @@@ public class StreamReade
              writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
-                 writePartition(deserializer, writer, cfs);
 -                writeRow(writer, in, cfs);
 -
++                writePartition(deserializer, writer);
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
@@@ -167,122 -167,9 +167,120 @@@
          return size;
      }
  
-     protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer,
ColumnFamilyStore cfs) throws IOException
 -    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws
IOException
++    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer)
throws IOException
      {
-         DecoratedKey key = deserializer.newPartition();
-         writer.append(deserializer);
 -        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
 -        writer.appendFromStream(key, cfs.metadata, in, inputVersion);
++        writer.append(deserializer.newPartition());
 +        deserializer.checkForExceptions();
-         cfs.invalidateCachedPartition(key);
 +    }
 +
 +    public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered>
implements UnfilteredRowIterator
 +    {
 +        private final CFMetaData metadata;
 +        private final DataInputPlus in;
 +        private final SerializationHeader header;
 +        private final SerializationHelper helper;
 +
 +        private DecoratedKey key;
 +        private DeletionTime partitionLevelDeletion;
 +        private SSTableSimpleIterator iterator;
 +        private Row staticRow;
 +        private IOException exception;
 +
 +        public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version,
SerializationHeader header)
 +        {
 +            assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
 +            this.metadata = metadata;
 +            this.in = in;
 +            this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(),
SerializationHelper.Flag.PRESERVE_SIZE);
 +            this.header = header;
 +        }
 +
-         public DecoratedKey newPartition() throws IOException
++        public StreamDeserializer newPartition() throws IOException
 +        {
 +            key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in));
 +            partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
 +            iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
 +            staticRow = iterator.readStaticRow();
-             return key;
++            return this;
 +        }
 +
 +        public CFMetaData metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        public PartitionColumns columns()
 +        {
 +            // We don't know which columns we'll get so assume it can be all of them
 +            return metadata.partitionColumns();
 +        }
 +
 +        public boolean isReverseOrder()
 +        {
 +            return false;
 +        }
 +
 +        public DecoratedKey partitionKey()
 +        {
 +            return key;
 +        }
 +
 +        public DeletionTime partitionLevelDeletion()
 +        {
 +            return partitionLevelDeletion;
 +        }
 +
 +        public Row staticRow()
 +        {
 +            return staticRow;
 +        }
 +
 +        public EncodingStats stats()
 +        {
 +            return header.stats();
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            try
 +            {
 +                return iterator.hasNext();
 +            }
 +            catch (IOError e)
 +            {
 +                if (e.getCause() != null && e.getCause() instanceof IOException)
 +                {
 +                    exception = (IOException)e.getCause();
 +                    return false;
 +                }
 +                throw e;
 +            }
 +        }
 +
 +        public Unfiltered next()
 +        {
 +            // Note that in practice we know that IOException will be thrown by hasNext(),
because that's
 +            // where the actual reading happens, so we don't bother catching RuntimeException
here (contrarily
 +            // to what we do in hasNext)
 +            Unfiltered unfiltered = iterator.next();
 +            return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
 +                 ? maybeMarkLocalToBeCleared((Row) unfiltered)
 +                 : unfiltered;
 +        }
 +
 +        private Row maybeMarkLocalToBeCleared(Row row)
 +        {
 +            return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
 +        }
 +
 +        public void checkForExceptions() throws IOException
 +        {
 +            if (exception != null)
 +                throw exception;
 +        }
 +
 +        public void close()
 +        {
 +        }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0b864fa,846524b..54ce711
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -29,17 -35,12 +36,19 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
+ import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
 -
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -128,66 -122,48 +137,92 @@@ public class StreamReceiveTask extends 
                  return;
              }
              ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +            boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
  
 -            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size()
* 256L);
 -            if (lockfiledir == null)
 -                throw new IOError(new IOException("All disks full"));
 -            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -            lockfile.create(task.sstables);
 -            List<SSTableReader> readers = new ArrayList<>();
 -            for (SSTableWriter writer : task.sstables)
 -                readers.add(writer.finish(true));
 -            lockfile.delete();
 -            task.sstables.clear();
 -
 -            try (Refs<SSTableReader> refs = Refs.ref(readers))
 +            try
              {
 -                // add sstables and build secondary indexes
 -                cfs.addSSTables(readers);
 -                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                //invalidate row and counter cache
 -                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                List<SSTableReader> readers = new ArrayList<>();
 +                for (SSTableMultiWriter writer : task.sstables)
                  {
 -                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                    for (SSTableReader sstable : readers)
 -                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(),
sstable.last.getToken()));
 -                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +                    Collection<SSTableReader> newReaders = writer.finish(true);
 +                    readers.addAll(newReaders);
 +                    task.txn.update(newReaders, false);
 +                }
 +
 +                task.sstables.clear();
  
 -                    if (cfs.isRowCacheEnabled())
 +                try (Refs<SSTableReader> refs = Refs.ref(readers))
 +                {
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must
put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
 +                    {
 +                        for (SSTableReader reader : readers)
 +                        {
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction
is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
 +                        }
 +                    }
 +                    else
                      {
 -                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                        if (invalidatedKeys > 0)
 -                            logger.debug("[Stream #{}] Invalidated {} row cache entries
on table {}.{} after stream " +
 -                                         "receive task completed.", task.session.planId(),
invalidatedKeys,
 -                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                        task.txn.finish();
 +
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
++
++                        //invalidate row and counter cache
++                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
++                        {
++                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
++                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(),
sstable.last.getToken())));
++                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
++
++                            if (cfs.isRowCacheEnabled())
++                            {
++                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
++                                if (invalidatedKeys > 0)
++                                    logger.debug("[Stream #{}] Invalidated {} row cache
entries on table {}.{} after stream " +
++                                                 "receive task completed.", task.session.planId(),
invalidatedKeys,
++                                                 cfs.keyspace.getName(), cfs.getTableName());
++                            }
++
++                            if (cfs.metadata.isCounter())
++                            {
++                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
++                                if (invalidatedKeys > 0)
++                                    logger.debug("[Stream #{}] Invalidated {} counter cache
entries on table {}.{} after stream " +
++                                                 "receive task completed.", task.session.planId(),
invalidatedKeys,
++                                                 cfs.keyspace.getName(), cfs.getTableName());
++                            }
++                        }
                      }
 +                }
 +                catch (Throwable t)
 +                {
 +                    logger.error("Error applying streamed sstable: ", t);
  
 -                    if (cfs.metadata.isCounter())
 +                    JVMStabilityInspector.inspectThrowable(t);
 +                }
 +                finally
 +                {
 +                    //We don't keep the streamed sstables since we've applied them manually
 +                    //So we abort the txn and delete the streamed sstables
 +                    if (hasViews)
                      {
 -                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                        if (invalidatedKeys > 0)
 -                            logger.debug("[Stream #{}] Invalidated {} counter cache entries
on table {}.{} after stream " +
 -                                         "receive task completed.", task.session.planId(),
invalidatedKeys,
 -                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                        cfs.forceBlockingFlush();
 +                        task.txn.abort();
                      }
                  }
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fca6aa7,facb906..8f53832
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -93,7 -92,8 +93,7 @@@ public class CompressedStreamReader ext
  
                  while (in.getBytesRead() < sectionLength)
                  {
-                     writePartition(deserializer, writer, cfs);
 -                    writeRow(writer, in, cfs);
 -
++                    writePartition(deserializer, writer);
                      // when compressed, report total bytes of compressed chunks read since
remoteFile.size is the sum of chunks transferred
                      session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(),
totalSize);
                  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 65ec420,ed7921e..91157ad
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -17,12 -17,9 +17,15 @@@
   */
  package org.apache.cassandra.db;
  
+ import java.util.Collections;
  import java.util.concurrent.ExecutionException;
  
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.dht.Bounds;
++import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -95,9 -89,51 +98,54 @@@ public class CounterCacheTes
      }
  
      @Test
+     public void testCounterCacheInvalidate()
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
++        Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
++        Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
++        ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
++
+         assertEquals(0, CacheService.instance.counterCache.size());
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
 -        assertNull(cfs.getCachedCounter(bytes(3), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(3), cellname(2)));
 -
 -        cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
 -        cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
 -        cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
 -        cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
 -        cfs.putCachedCounter(bytes(3), cellname(1), ClockAndCount.create(3L, 1L));
 -        cfs.putCachedCounter(bytes(3), cellname(2), ClockAndCount.create(3L, 2L));
 -
 -        assertEquals(6, CacheService.instance.counterCache.size());
 -        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
 -        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
 -        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
 -
 -        cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.partitioner.decorateKey(bytes(1)).getToken(),
 -                                                                           cfs.partitioner.decorateKey(bytes(2)).getToken())));
++        assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(3), c1, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(3), c2, cd, null));
++
++        cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L));
++        cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L));
++        cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L));
++        cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L));
++        cfs.putCachedCounter(bytes(3), c1, cd, null, ClockAndCount.create(3L, 1L));
++        cfs.putCachedCounter(bytes(3), c2, cd, null, ClockAndCount.create(3L, 2L));
++
++        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd,
null));
++        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd,
null));
++        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd,
null));
++        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd,
null));
++        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd,
null));
++        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd,
null));
++
++        cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.decorateKey(bytes(1)).getToken(),
++                                                                           cfs.decorateKey(bytes(2)).getToken())));
+ 
+         assertEquals(2, CacheService.instance.counterCache.size());
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
 -        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
 -        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
++        assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
++        assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
++        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd,
null));
++        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd,
null));
+     }
+ 
+     @Test
      public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
          cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index d407f7a,9fb322b..b157adc
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -20,22 -20,28 +20,27 @@@ package org.apache.cassandra.db
  
  import java.net.InetAddress;
  import java.nio.ByteBuffer;
+ import java.util.ArrayList;
 -import java.util.Collection;
 +import java.util.Arrays;
  import java.util.Iterator;
+ import java.util.TreeSet;
  
+ import com.google.common.collect.Lists;
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
  import org.junit.Test;
 -
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.cache.CachingOptions;
  import org.apache.cassandra.cache.RowCacheKey;
 -import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.db.composites.*;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.filter.QueryFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.marshal.IntegerType;
 +import org.apache.cassandra.db.partitions.CachedPartition;
+ import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
  import org.apache.cassandra.locator.TokenMetadata;
@@@ -230,6 -178,51 +235,51 @@@ public class RowCacheTes
      }
  
      @Test
+     public void testInvalidateRowCache() throws Exception
+     {
+         StorageService.instance.initServer(0);
+         CacheService.instance.setRowCacheCapacityInMB(1);
+         rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+ 
+         ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED);
+         assertEquals(CacheService.instance.rowCache.size(), 100);
+ 
 -        //construct 5 ranges of 20 elements each
++        //construct 5 bounds of 20 elements each
+         ArrayList<Bounds<Token>> subranges = getBounds(20);
+ 
 -        //invalidate 3 of the 5 ranges
++        //invalidate 3 of the 5 bounds
+         ArrayList<Bounds<Token>> boundsToInvalidate = Lists.newArrayList(subranges.get(0),
subranges.get(2), subranges.get(4));
+         int invalidatedKeys = store.invalidateRowCache(boundsToInvalidate);
+         assertEquals(60, invalidatedKeys);
+ 
+         //now there should be only 40 cached entries left
+         assertEquals(CacheService.instance.rowCache.size(), 40);
+         CacheService.instance.setRowCacheCapacityInMB(0);
+     }
+ 
+     private ArrayList<Bounds<Token>> getBounds(int nElements)
+     {
+         ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED);
+         TreeSet<DecoratedKey> orderedKeys = new TreeSet<>();
+ 
+         for(Iterator<RowCacheKey> it = CacheService.instance.rowCache.keyIterator();it.hasNext();)
 -            orderedKeys.add(store.partitioner.decorateKey(ByteBuffer.wrap(it.next().key)));
++            orderedKeys.add(store.decorateKey(ByteBuffer.wrap(it.next().key)));
+ 
+         ArrayList<Bounds<Token>> boundsToInvalidate = new ArrayList<>();
+         Iterator<DecoratedKey> iterator = orderedKeys.iterator();
+ 
+         while (iterator.hasNext())
+         {
+             Token startRange = iterator.next().getToken();
+             for (int i = 0; i < nElements-2; i++)
+                 iterator.next();
+             Token endRange = iterator.next().getToken();
+             boundsToInvalidate.add(new Bounds<>(startRange, endRange));
+         }
+         return boundsToInvalidate;
+     }
+ 
+     @Test
      public void testRowCachePartialLoad() throws Exception
      {
          CacheService.instance.setRowCacheCapacityInMB(1);


Mime
View raw message