Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B4BC186E9 for ; Wed, 16 Sep 2015 20:05:13 +0000 (UTC) Received: (qmail 45468 invoked by uid 500); 16 Sep 2015 20:05:12 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 45304 invoked by uid 500); 16 Sep 2015 20:05:12 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 44701 invoked by uid 99); 16 Sep 2015 20:05:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 20:05:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 96667E05E3; Wed, 16 Sep 2015 20:05:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snazy@apache.org To: commits@cassandra.apache.org Date: Wed, 16 Sep 2015 20:05:18 -0000 Message-Id: In-Reply-To: <6a7262e1586347df8e588b2eee3c7ad1@git.apache.org> References: <6a7262e1586347df8e588b2eee3c7ad1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CacheService.java index a48466a,a13a52d..a3e7d12 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@@ -21,9 -22,9 +21,8 @@@ import java.io.IOException import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; - import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@@ -363,48 -360,33 +364,53 @@@ public class CacheService implements Ca ByteBufferUtil.writeWithLength(key.cellName, out); } - public Future> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException + public Future> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in); - ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in); + final ByteBuffer cellName = ByteBufferUtil.readWithLength(in); + if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled()) + return null; + assert(cfs.metadata.isCounter()); - final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer); return StageManager.getStage(Stage.READ).submit(new Callable>() { public Pair call() throws Exception { - DecoratedKey key = cfs.partitioner.decorateKey(partitionKey); - QueryFilter filter = QueryFilter.getNamesFilter(key, - cfs.metadata.cfName, - FBUtilities.singleton(cellName, cfs.metadata.comparator), - Long.MIN_VALUE); - ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE); - if (cf == null) - return null; - Cell cell = cf.getColumn(cellName); - if (cell == null || !cell.isLive(Long.MIN_VALUE)) - return null; - ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); - return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount); + DecoratedKey key = cfs.decorateKey(partitionKey); + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName); + ColumnDefinition column = name.column; + CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); + + int nowInSec = FBUtilities.nowInSeconds(); + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + if (path == null) + builder.add(column); + else + builder.select(column, path); + + ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(name.clustering, cfs.metadata.comparator), false); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter); + try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec)) + { + Cell cell; + if (column.isStatic()) + { + cell = iter.staticRow().getCell(column); + } + else + { + if (!iter.hasNext()) + return null; + cell = iter.next().getCell(column); + } + + if (cell == null) + return null; + + ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); - return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, name.clustering, column, path), clockAndCount); ++ return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, name.clustering, column, path), clockAndCount); + } } }); } @@@ -412,27 -394,30 +418,34 @@@ public static class RowCacheSerializer implements CacheSerializer { - public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException + public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { - assert(!cfs.isIndex()); ++ assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes + out.write(cfs.metadata.ksAndCFBytes); ByteBufferUtil.writeWithLength(key.key, out); } - public Future> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException + public Future> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + final int rowsToCache = cfs.metadata.params.caching.rowsPerPartitionToCache(); + if (cfs == null || !cfs.isRowCacheEnabled()) + return null; - assert(!cfs.isIndex()); ++ assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes return StageManager.getStage(Stage.READ).submit(new Callable>() { public Pair call() throws Exception { - DecoratedKey key = cfs.partitioner.decorateKey(buffer); - QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE); - ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data); + DecoratedKey key = cfs.decorateKey(buffer); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op)) + { + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); - return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache); ++ return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache); + } } }); } @@@ -453,11 -435,13 +463,13 @@@ ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out); - key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out); ++ key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, SerializationHeader.forKeyCache(cfs.metadata)).serialize(entry, out); } - public Future> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException + public Future> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer int keyLength = input.readInt(); if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT) { @@@ -466,25 -450,18 +478,25 @@@ } ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); - SSTableReader reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL)); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean - if (reader == null) + SSTableReader reader = null; - if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null) ++ if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null) { - RowIndexEntry.Serializer.skip(input); + // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be + // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that + // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this + // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed). - RowIndexEntry.Serializer.skipPromotedIndex(input, BigFormat.instance.getLatestVersion()); ++ RowIndexEntry.Serializer.skip(input, BigFormat.instance.getLatestVersion()); return null; } - RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version); + RowIndexEntry.IndexSerializer indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata, + reader.descriptor.version, + SerializationHeader.forKeyCache(cfs.metadata)); + RowIndexEntry entry = indexSerializer.deserialize(input); - return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); } - private SSTableReader findDesc(int generation, Collection collection) + private SSTableReader findDesc(int generation, Iterable collection) { for (SSTableReader sstable : collection) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 1408a70,075c8f7..f9ee9e8 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -26,8 -26,11 +26,10 @@@ import java.net.UnknownHostException import java.rmi.registry.LocateRegistry; import java.rmi.server.RMIServerSocketFactory; import java.util.Collections; + import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; @@@ -40,8 -42,10 +42,10 @@@ import com.codahale.metrics.Meter import com.codahale.metrics.MetricRegistryListener; import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; + import com.google.common.util.concurrent.Futures; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.metrics.DefaultNameFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index f0ad46f,fa370dc..5e4ef39 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2652,15 -2624,15 +2659,15 @@@ public class StorageService extends Not /** * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified. - * - * + * + * * @param tag * the tag given to the snapshot; may not be null or empty - * @param columnFamilyList - * list of columnfamily from different keyspace in the form of ks1.cf1 ks2.cf2 + * @param tableList + * list of tables from different keyspace in the form of ks1.cf1 ks2.cf2 */ @Override - public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList) + public void takeMultipleTableSnapshot(String tag, String... tableList) throws IOException { Map> keyspaceColumnfamily = new HashMap>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/FBUtilities.java index c4b4193,a16fa13..83b07e0 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@@ -775,29 -793,19 +775,45 @@@ public class FBUtilitie digest.update((byte) ((val >>> 0) & 0xFF)); } + public static void updateWithBoolean(MessageDigest digest, boolean val) + { + updateWithByte(digest, val ? 0 : 1); + } + + public static void closeAll(List l) throws Exception + { + Exception toThrow = null; + for (AutoCloseable c : l) + { + try + { + c.close(); + } + catch (Exception e) + { + if (toThrow == null) + toThrow = e; + else + toThrow.addSuppressed(e); + } + } + if (toThrow != null) + throw toThrow; + } ++ + public static byte[] toWriteUTFBytes(String s) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeUTF(s); + dos.flush(); + return baos.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java index c4157ea,475e436..0c7e8a5 --- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java +++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java @@@ -78,9 -76,8 +78,8 @@@ public class AutoSavingCacheTes Assert.assertEquals(0, keyCache.size()); // then load saved - keyCache.loadSaved(cfs); - Assert.assertEquals(2, keyCache.size()); + keyCache.loadSavedAsync().get(); - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) - Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1")))); + Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1")))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 21a41c4,bfcfa59..cd52d35 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@@ -18,27 -19,27 +18,33 @@@ */ package org.apache.cassandra.cache; - import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; - import java.util.*; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.*; + ++ ++import java.util.ArrayList; ++import java.util.List; ++ + + import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.ArrayBackedSortedColumns; -import org.apache.cassandra.db.ColumnFamily; + import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.locator.SimpleStrategy; + import org.apache.cassandra.utils.Pair; + import com.googlecode.concurrentlinkedhashmap.Weighers; -import static org.apache.cassandra.Util.column; -import static org.junit.Assert.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.partitions.*; - import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; public class CacheProviderTest { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java index 9fef63f,1a60d6d..861e840 --- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java +++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java @@@ -26,15 -25,19 +26,20 @@@ import java.util.List import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.ImmutableSet; - + import org.apache.cassandra.cache.KeyCacheKey; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.metrics.CacheMetrics; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import static org.junit.Assert.assertNull; ++import org.apache.cassandra.utils.Pair; ++ public class KeyCacheCqlTest extends CQLTester { @@@ -167,6 -71,49 +172,47 @@@ insertData(table, "some_index", true); clearCache(); + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + + long hits = metrics.hits.getCount(); + long requests = metrics.requests.getCount(); - assertEquals(4900, hits); - assertEquals(5250, requests); - - // ++ assertEquals(0, hits); ++ assertEquals(210, requests); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); + hits = metrics.hits.getCount(); + requests = metrics.requests.getCount(); - assertEquals(10000, hits); - assertEquals(10500, requests); ++ assertEquals(200, hits); ++ assertEquals(420, requests); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + int beforeSize = CacheService.instance.keyCache.size(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(beforeSize, CacheService.instance.keyCache.size()); + for (int i = 0; i < 10; i++) { UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); @@@ -173,11 -123,44 +222,44 @@@ assertEquals(500, result.size()); } + //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths + //are defensive and unreachable + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar"))); + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar"))); + + dropTable("DROP TABLE %s"); + + //Test loading for a dropped 2i/table + CacheService.instance.keyCache.clear(); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(0, CacheService.instance.keyCache.size()); + } + + @Test + public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + createIndex("CREATE INDEX some_index ON %s (col_int)"); + insertData(table, "some_index", true); + clearCache(); + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + long hits = metrics.hits.getCount(); long requests = metrics.requests.getCount(); - assertEquals(4900, hits); - assertEquals(5250, requests); + assertEquals(0, hits); + assertEquals(210, requests); // @@@ -193,110 -176,29 +275,129 @@@ metrics = CacheService.instance.keyCache.getMetrics(); hits = metrics.hits.getCount(); requests = metrics.requests.getCount(); - assertEquals(10000, hits); - assertEquals(10500, requests); + assertEquals(200, hits); + assertEquals(420, requests); + + dropTable("DROP TABLE %s"); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + Iterator iter = CacheService.instance.keyCache.keyIterator(); + while(iter.hasNext()) + { + KeyCacheKey key = iter.next(); + Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE")); + Assert.assertFalse(key.ksAndCFName.right.startsWith(table)); + } } + @Test + public void testKeyCacheNonClustered() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b)))"); + insertData(table, null, false); + clearCache(); + + for (int i = 0; i < 10; i++) + { + assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)), + new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) }); + } + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + long hits = metrics.hits.getCount(); + long requests = metrics.requests.getCount(); + assertEquals(0, hits); + assertEquals(10, requests); + + for (int i = 0; i < 100; i++) + { + assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)), + new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) }); + } + + hits = metrics.hits.getCount(); + requests = metrics.requests.getCount(); + assertEquals(10, hits); + assertEquals(120, requests); + } + + @Test + public void testKeyCacheClustered() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + insertData(table, null, true); + clearCache(); + + // query on partition key + + // 10 queries, each 50 result rows + for (int i = 0; i < 10; i++) + { + assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size()); + } + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + long hits = metrics.hits.getCount(); + long requests = metrics.requests.getCount(); + assertEquals(0, hits); + assertEquals(10, requests); + + // 10 queries, each 50 result rows + for (int i = 0; i < 10; i++) + { + assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); + hits = metrics.hits.getCount(); + requests = metrics.requests.getCount(); + assertEquals(10, hits); + assertEquals(10 + 10, requests); + + // 100 queries - must get a hit in key-cache + for (int i = 0; i < 10; i++) + { + for (int c = 0; c < 10; c++) + { + assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c), + new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c }); + } + } + + metrics = CacheService.instance.keyCache.getMetrics(); + hits = metrics.hits.getCount(); + requests = metrics.requests.getCount(); + assertEquals(10 + 100, hits); + assertEquals(20 + 100, requests); + + // 5000 queries - first 10 partitions already in key cache + for (int i = 0; i < 100; i++) + { + for (int c = 0; c < 50; c++) + { + assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c), + new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c }); + } + } + + hits = metrics.hits.getCount(); + requests = metrics.requests.getCount(); + assertEquals(110 + 4910, hits); + assertEquals(120 + 5500, requests); + } + // Inserts 100 partitions split over 10 sstables (flush after 10 partitions). // Clustered tables receive 50 CQL rows per partition. private void insertData(String table, String index, boolean withClustering) throws Throwable http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/CounterCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java index 517bce6,5b37b2c..65ec420 --- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java @@@ -28,10 -24,14 +28,11 @@@ import org.junit.BeforeClass import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@@ -68,42 -63,40 +69,44 @@@ public class CounterCacheTes @Test public void testReadWrite() { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); - assertEquals(0, CacheService.instance.counterCache.size()); - assertNull(cfs.getCachedCounter(bytes(1), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(1), cellname(2))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(2))); + Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build(); + Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build(); + ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c")); - cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L)); - cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L)); - cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L)); - cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L)); - - assertEquals(4, CacheService.instance.counterCache.size()); - assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1))); - assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2))); - assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1))); - assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2))); + assertEquals(0, CacheService.instance.counterCache.size()); + assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null)); + assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null)); + assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null)); + assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null)); + + cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L)); + cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L)); + cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L)); + cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L)); + + assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null)); + assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null)); + assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null)); + assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null)); } @Test public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); - ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); - cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); - cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); - new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply(); - new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply(); + new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); + new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); + new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); + new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); + + assertEquals(4, CacheService.instance.counterCache.size()); // flush the counter cache and invalidate CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); @@@ -111,16 -104,76 +114,79 @@@ assertEquals(0, CacheService.instance.counterCache.size()); // load from cache and validate - CacheService.instance.counterCache.loadSaved(cfs); + CacheService.instance.counterCache.loadSaved(); assertEquals(4, CacheService.instance.counterCache.size()); - assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1))); - assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2))); - assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1))); - assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2))); + + Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build(); + Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build(); + ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c")); + + assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null)); + assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null)); + assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null)); + assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null)); } + + @Test + public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + - ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); - cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); - cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); - new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply(); - new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + + Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1); + + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + Schema.instance.storeKeyspaceInstance(ks); + } + } + + @Test + public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + - ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); - cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); - cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); - new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply(); - new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + + + CacheService.instance.setCounterCacheCapacityInMB(0); + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + CacheService.instance.setCounterCacheCapacityInMB(1); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java index be22b45,5912d7c..d407f7a --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@@ -71,53 -72,6 +71,53 @@@ public class RowCacheTes } @Test + public void testRoundTrip() throws Exception + { + CompactionManager.instance.disableAutoCompaction(); + + Keyspace keyspace = Keyspace.open(KEYSPACE_CACHED); + String cf = "CachedIntCF"; + ColumnFamilyStore cachedStore = keyspace.getColumnFamilyStore(cf); + long startRowCacheHits = cachedStore.metric.rowCacheHit.getCount(); + long startRowCacheOutOfRange = cachedStore.metric.rowCacheHitOutOfRange.getCount(); + // empty the row cache + CacheService.instance.invalidateRowCache(); + + // set global row cache size to 1 MB + CacheService.instance.setRowCacheCapacityInMB(1); + + ByteBuffer key = ByteBufferUtil.bytes("rowcachekey"); + DecoratedKey dk = cachedStore.decorateKey(key); - RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk); ++ RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk); + + RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key); + rub.clustering(String.valueOf(0)); + rub.add("val", ByteBufferUtil.bytes("val" + 0)); + rub.build().applyUnsafe(); + + // populate row cache, we should not get a row cache hit; + Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build()); + assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount()); + + // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range + Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build()); + assertEquals(++startRowCacheHits, cachedStore.metric.rowCacheHit.getCount()); + assertEquals(startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.getCount()); + + CachedPartition cachedCf = (CachedPartition)CacheService.instance.rowCache.get(rck); + assertEquals(1, cachedCf.rowCount()); + for (Unfiltered unfiltered : Util.once(cachedCf.unfilteredIterator(ColumnFilter.selection(cachedCf.columns()), Slices.ALL, false))) + { + Row r = (Row) unfiltered; + for (ColumnData c : r) + { + assertEquals(((Cell)c).value(), ByteBufferUtil.bytes("val" + 0)); + } + } + cachedStore.truncateBlocking(); + } + + @Test public void testRowCache() throws Exception { CompactionManager.instance.disableAutoCompaction(); @@@ -254,21 -231,18 +289,21 @@@ CacheService.instance.setRowCacheCapacityInMB(1); ByteBuffer key = ByteBufferUtil.bytes("rowcachekey"); - DecoratedKey dk = cachedStore.partitioner.decorateKey(key); + DecoratedKey dk = cachedStore.decorateKey(key); - RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk); + RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk); - Mutation mutation = new Mutation(KEYSPACE_CACHED, key); + String values[] = new String[200]; for (int i = 0; i < 200; i++) - mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis()); - mutation.applyUnsafe(); + { + RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key); + rub.clustering(String.valueOf(i)); + values[i] = "val" + i; + rub.add("val", ByteBufferUtil.bytes(values[i])); + rub.build().applyUnsafe(); + } + Arrays.sort(values); // populate row cache, we should not get a row cache hit; - cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, - Composites.EMPTY, - Composites.EMPTY, - false, 10, System.currentTimeMillis())); + Util.getAll(Util.cmd(cachedStore, dk).withLimit(10).build()); assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount()); // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range @@@ -335,19 -309,6 +370,19 @@@ // empty the cache again to make sure values came from disk CacheService.instance.invalidateRowCache(); assertEquals(0, CacheService.instance.rowCache.size()); - assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store)); + assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved()); } + + private static void readData(String keyspace, String columnFamily, int offset, int numberOfRows) + { + ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); + CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily); + + for (int i = offset; i < offset + numberOfRows; i++) + { + DecoratedKey key = Util.dk("key" + i); + Clustering cl = new Clustering(ByteBufferUtil.bytes("col" + i)); + Util.getAll(Util.cmd(store, key).build()); + } + } }