Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B184186EC for ; Wed, 16 Sep 2015 20:05:17 +0000 (UTC) Received: (qmail 39813 invoked by uid 500); 16 Sep 2015 20:04:43 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 39675 invoked by uid 500); 16 Sep 2015 20:04:43 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 39455 invoked by uid 99); 16 Sep 2015 20:04:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 20:04:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B00F4E0243; Wed, 16 Sep 2015 20:04:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snazy@apache.org To: commits@cassandra.apache.org Date: Wed, 16 Sep 2015 20:04:45 -0000 Message-Id: <6e32d147a75044f2862a90a3244271cf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/7] cassandra git commit: 2i key cache load fails 2i key cache load fails patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-10155 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e889ee40 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e889ee40 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e889ee40 Branch: refs/heads/cassandra-2.2 Commit: e889ee408bec5330c312ff6b72a81a0012fdf2a5 Parents: 6479d94 Author: Ariel Weisberg Authored: Wed Sep 16 21:57:54 2015 +0200 Committer: Robert Stupp Committed: Wed Sep 16 21:57:54 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 172 +++++++----- .../org/apache/cassandra/cache/CacheKey.java | 14 +- .../apache/cassandra/cache/CounterCacheKey.java | 26 +- .../org/apache/cassandra/cache/KeyCacheKey.java | 19 +- .../org/apache/cassandra/cache/RowCacheKey.java | 30 +-- .../org/apache/cassandra/config/CFMetaData.java | 9 + .../cassandra/config/DatabaseDescriptor.java | 15 +- .../org/apache/cassandra/config/Schema.java | 56 +++- .../apache/cassandra/db/ColumnFamilyStore.java | 75 ++---- src/java/org/apache/cassandra/db/Keyspace.java | 4 - .../org/apache/cassandra/db/RowIndexEntry.java | 2 +- .../db/index/SecondaryIndexManager.java | 21 +- .../cassandra/io/sstable/SSTableReader.java | 8 +- .../apache/cassandra/service/CacheService.java | 58 ++-- .../cassandra/service/CassandraDaemon.java | 45 +++- .../cassandra/service/StorageService.java | 31 ++- .../org/apache/cassandra/utils/FBUtilities.java | 16 ++ .../cassandra/cache/AutoSavingCacheTest.java | 5 +- .../cassandra/cache/CacheProviderTest.java | 16 +- .../apache/cassandra/cql3/KeyCacheCqlTest.java | 263 +++++++++++++++++++ .../apache/cassandra/db/CounterCacheTest.java | 70 ++++- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../org/apache/cassandra/db/RowCacheTest.java | 42 ++- 24 files changed, 739 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2787739..207f16a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.10 + * Fix cache handling of 2i and base tables (CASSANDRA-10155) * Fix NPE in nodetool compactionhistory (CASSANDRA-9758) * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410) * BATCH statement is broken in cqlsh (CASSANDRA-10272) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 98e3e59..3ebbc76 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -19,6 +19,8 @@ package org.apache.cassandra.cache; import java.io.*; import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -27,6 +29,10 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -60,7 +66,15 @@ public class AutoSavingCache extends InstrumentingCache cacheLoader; - private static final String CURRENT_VERSION = "b"; + + /* + * CASSANDRA-10155 required a format change to fix 2i indexes and caching. + * 2.2 is already at version "c" and 3.0 is at "d". + * + * Since cache versions match exactly and there is no partial fallback just add + * a minor version letter. + */ + private static final String CURRENT_VERSION = "ba"; private static volatile IStreamFactory streamFactory = new IStreamFactory() { @@ -88,16 +102,9 @@ public class AutoSavingCache extends InstrumentingCache names = Schema.instance.getCF(cfId); - return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version); + return DatabaseDescriptor.getSerializedCachePath(cacheType, version); } public Writer getWriter(int keysToSave) @@ -128,16 +135,42 @@ public class AutoSavingCache extends InstrumentingCache loadSavedAsync() + { + final ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + final long start = System.nanoTime(); + + ListenableFuture cacheLoad = es.submit(new Callable() + { + @Override + public Integer call() throws Exception + { + return loadSaved(); + } + }); + cacheLoad.addListener(new Runnable() { + @Override + public void run() + { + if (size() > 0) + logger.info("Completed loading ({} ms; {} keys) {} cache", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), + CacheService.instance.keyCache.size(), + cacheType); + es.shutdown(); + } + }, MoreExecutors.sameThreadExecutor()); + + return cacheLoad; + } + + public int loadSaved() { int count = 0; long start = System.nanoTime(); // modern format, allows both key and value (so key cache load can be purely sequential) - File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION); - // if path does not exist, try without cfId (assuming saved cache is created with current CF) - if (!path.exists()) - path = getCachePath(cfs.keyspace.getName(), cfs.name, null, CURRENT_VERSION); + File path = getCachePath(CURRENT_VERSION); if (path.exists()) { DataInputStream in = null; @@ -145,28 +178,57 @@ public class AutoSavingCache extends InstrumentingCache>> futures = new ArrayList>>(); + ArrayDeque>> futures = new ArrayDeque>>(); + while (in.available() > 0) { - Future> entry = cacheLoader.deserialize(in, cfs); + //ksname and cfname are serialized by the serializers in CacheService + //That is delegated there because there are serializer specific conditions + //where a cache key is skipped and not written + String ksname = in.readUTF(); + String cfname = in.readUTF(); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname)); + + Future> entryFuture = cacheLoader.deserialize(in, cfs); // Key cache entry can return null, if the SSTable doesn't exist. - if (entry == null) + if (entryFuture == null) continue; - futures.add(entry); + + futures.offer(entryFuture); count++; + + /* + * Kind of unwise to accrue an unbounded number of pending futures + * So now there is this loop to keep a bounded number pending. + */ + do + { + while (futures.peek() != null && futures.peek().isDone()) + { + Future> future = futures.poll(); + Pair entry = future.get(); + if (entry != null && entry.right != null) + put(entry.left, entry.right); + } + + if (futures.size() > 1000) + Thread.yield(); + } while(futures.size() > 1000); } - for (Future> future : futures) + Future> future = null; + while ((future = futures.poll()) != null) { Pair entry = future.get(); if (entry != null && entry.right != null) put(entry.left, entry.right); } } - catch (Exception e) + catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(e); - logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e); + JVMStabilityInspector.inspectThrowable(t); + logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t); } finally { @@ -238,44 +300,33 @@ public class AutoSavingCache extends InstrumentingCache writers = new HashMap<>(); - HashMap streams = new HashMap<>(); - HashMap paths = new HashMap<>(); - + DataOutputStreamPlus writer = null; + File tempCacheFile = tempCacheFile(); try { + try + { + writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + for (K key : keys) { - UUID cfId = key.getCFId(); - if (!Schema.instance.hasCF(key.getCFId())) - continue; // the table has been dropped. - DataOutputPlus writer = writers.get(cfId); - if (writer == null) - { - File writerPath = tempCacheFile(cfId); - OutputStream stream; - try - { - stream = streamFactory.getOutputStream(writerPath); - writer = new DataOutputStreamPlus(stream); - } - catch (FileNotFoundException e) - { - throw new RuntimeException(e); - } - paths.put(cfId, writerPath); - streams.put(cfId, stream); - writers.put(cfId, writer); - } + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName); + if (cfs == null) + continue; // the table or 2i has been dropped. try { - cacheLoader.serialize(key, writer); + cacheLoader.serialize(key, writer, cfs); } catch (IOException e) { - throw new FSWriteError(e, paths.get(cfId)); + throw new FSWriteError(e, tempCacheFile); } keysWritten++; @@ -283,28 +334,23 @@ public class AutoSavingCache extends InstrumentingCache entry : writers.entrySet()) - { - UUID cfId = entry.getKey(); + File cacheFile = getCachePath(CURRENT_VERSION); - File tmpFile = paths.get(cfId); - File cacheFile = getCachePath(cfId, CURRENT_VERSION); + cacheFile.delete(); // ignore error if it didn't exist - cacheFile.delete(); // ignore error if it didn't exist - if (!tmpFile.renameTo(cacheFile)) - logger.error("Unable to rename {} to {}", tmpFile, cacheFile); - } + if (!tempCacheFile.renameTo(cacheFile)) + logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile); logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } - private File tempCacheFile(UUID cfId) + private File tempCacheFile() { - File path = getCachePath(cfId, CURRENT_VERSION); + File path = getCachePath(CURRENT_VERSION); return FileUtils.createTempFile(path.getName(), null, path.getParentFile()); } @@ -337,7 +383,7 @@ public class AutoSavingCache extends InstrumentingCache { - void serialize(K key, DataOutputPlus out) throws IOException; + void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException; Future> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CacheKey.java b/src/java/org/apache/cassandra/cache/CacheKey.java index 44fead0..0e82990 100644 --- a/src/java/org/apache/cassandra/cache/CacheKey.java +++ b/src/java/org/apache/cassandra/cache/CacheKey.java @@ -17,12 +17,14 @@ */ package org.apache.cassandra.cache; -import java.util.UUID; +import org.apache.cassandra.utils.Pair; -public interface CacheKey extends IMeasurableMemory +public abstract class CacheKey implements IMeasurableMemory { - /** - * @return The cf id of the cache key. - */ - public UUID getCFId(); + public final Pair ksAndCFName; + + public CacheKey(Pair ksAndCFName) + { + this.ksAndCFName = ksAndCFName; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CounterCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java index 60247c5..68856eb 100644 --- a/src/java/org/apache/cassandra/cache/CounterCacheKey.java +++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java @@ -19,36 +19,28 @@ package org.apache.cassandra.cache; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.UUID; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.utils.*; -public class CounterCacheKey implements CacheKey +public final class CounterCacheKey extends CacheKey { - private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1)))) - + ObjectSizes.measure(new UUID(0, 0)); + private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1)))); - public final UUID cfId; public final byte[] partitionKey; public final byte[] cellName; - private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName) + private CounterCacheKey(Pair ksAndCFName, ByteBuffer partitionKey, CellName cellName) { - this.cfId = cfId; + super(ksAndCFName); this.partitionKey = ByteBufferUtil.getArray(partitionKey); this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer()); } - public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName) + public static CounterCacheKey create(Pair ksAndCFName, ByteBuffer partitionKey, CellName cellName) { - return new CounterCacheKey(cfId, partitionKey, cellName); - } - - public UUID getCFId() - { - return cfId; + return new CounterCacheKey(ksAndCFName, partitionKey, cellName); } public long unsharedHeapSize() @@ -62,7 +54,7 @@ public class CounterCacheKey implements CacheKey public String toString() { return String.format("CounterCacheKey(%s, %s, %s)", - cfId, + ksAndCFName, ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)), ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName))); } @@ -70,7 +62,7 @@ public class CounterCacheKey implements CacheKey @Override public int hashCode() { - return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName}); + return Arrays.deepHashCode(new Object[]{ksAndCFName, partitionKey, cellName}); } @Override @@ -84,7 +76,7 @@ public class CounterCacheKey implements CacheKey CounterCacheKey cck = (CounterCacheKey) o; - return cfId.equals(cck.cfId) + return ksAndCFName.equals(cck.ksAndCFName) && Arrays.equals(partitionKey, cck.partitionKey) && Arrays.equals(cellName, cck.cellName); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/KeyCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java index cef37ce..222622c 100644 --- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java +++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java @@ -19,15 +19,14 @@ package org.apache.cassandra.cache; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.UUID; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.Pair; -public class KeyCacheKey implements CacheKey +public class KeyCacheKey extends CacheKey { - public final UUID cfId; public final Descriptor desc; private static final long EMPTY_SIZE = ObjectSizes.measure(new KeyCacheKey(null, null, ByteBufferUtil.EMPTY_BYTE_BUFFER)); @@ -36,19 +35,15 @@ public class KeyCacheKey implements CacheKey // without extra copies on lookup since client-provided key ByteBuffers will be array-backed already public final byte[] key; - public KeyCacheKey(UUID cfId, Descriptor desc, ByteBuffer key) + public KeyCacheKey(Pair ksAndCFName, Descriptor desc, ByteBuffer key) { - this.cfId = cfId; + + super(ksAndCFName); this.desc = desc; this.key = ByteBufferUtil.getArray(key); assert this.key != null; } - public UUID getCFId() - { - return cfId; - } - public String toString() { return String.format("KeyCacheKey(%s, %s)", desc, ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key))); @@ -67,13 +62,13 @@ public class KeyCacheKey implements CacheKey KeyCacheKey that = (KeyCacheKey) o; - return cfId.equals(that.cfId) && desc.equals(that.desc) && Arrays.equals(key, that.key); + return ksAndCFName.equals(that.ksAndCFName) && desc.equals(that.desc) && Arrays.equals(key, that.key); } @Override public int hashCode() { - int result = cfId.hashCode(); + int result = ksAndCFName.hashCode(); result = 31 * result + desc.hashCode(); result = 31 * result + Arrays.hashCode(key); return result; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/RowCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java index af2d4d4..c959fd1 100644 --- a/src/java/org/apache/cassandra/cache/RowCacheKey.java +++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java @@ -19,37 +19,30 @@ package org.apache.cassandra.cache; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.UUID; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.Pair; -public class RowCacheKey implements CacheKey, Comparable +public final class RowCacheKey extends CacheKey { - public final UUID cfId; public final byte[] key; private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER)); - public RowCacheKey(UUID cfId, DecoratedKey key) + public RowCacheKey(Pair ksAndCFName, DecoratedKey key) { - this(cfId, key.getKey()); + this(ksAndCFName, key.getKey()); } - public RowCacheKey(UUID cfId, ByteBuffer key) + public RowCacheKey(Pair ksAndCFName, ByteBuffer key) { - this.cfId = cfId; + super(ksAndCFName); this.key = ByteBufferUtil.getArray(key); assert this.key != null; } - public UUID getCFId() - { - return cfId; - } - public long unsharedHeapSize() { return EMPTY_SIZE + ObjectSizes.sizeOfArray(key); @@ -63,25 +56,20 @@ public class RowCacheKey implements CacheKey, Comparable RowCacheKey that = (RowCacheKey) o; - return cfId.equals(that.cfId) && Arrays.equals(key, that.key); + return ksAndCFName.equals(that.ksAndCFName) && Arrays.equals(key, that.key); } @Override public int hashCode() { - int result = cfId.hashCode(); + int result = ksAndCFName.hashCode(); result = 31 * result + (key != null ? Arrays.hashCode(key) : 0); return result; } - public int compareTo(RowCacheKey otherKey) - { - return (cfId.compareTo(otherKey.cfId) < 0) ? -1 : ((cfId.equals(otherKey.cfId)) ? FBUtilities.compareUnsigned(key, otherKey.key, 0, 0, key.length, otherKey.key.length) : 1); - } - @Override public String toString() { - return String.format("RowCacheKey(cfId:%s, key:%s)", cfId, Arrays.toString(key)); + return String.format("RowCacheKey(ksAndCFName:%s, key:%s)", ksAndCFName, Arrays.toString(key)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 2c6a30c..2939f09 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -56,6 +56,7 @@ import org.apache.cassandra.thrift.CqlRow; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.github.jamm.Unmetered; @@ -385,6 +386,8 @@ public final class CFMetaData public final UUID cfId; // internal id, never exposed to user public final String ksName; // name of keyspace public final String cfName; // name of this column family + public final Pair ksAndCFName; + public final byte[] ksAndCFBytes; public final ColumnFamilyType cfType; // standard, super public volatile CellNameType comparator; // bytes, long, timeuuid, utf8, etc. @@ -475,6 +478,12 @@ public final class CFMetaData cfId = id; ksName = keyspace; cfName = name; + ksAndCFName = Pair.create(keyspace, name); + byte[] ksBytes = FBUtilities.toWriteUTFBytes(ksName); + byte[] cfBytes = FBUtilities.toWriteUTFBytes(cfName); + ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length); + System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length); + cfType = type; comparator = comp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3a6a8fd..84381a0 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1437,16 +1437,11 @@ public class DatabaseDescriptor return conf.index_interval; } - public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version) - { - StringBuilder builder = new StringBuilder(); - builder.append(ksName).append('-'); - builder.append(cfName).append('-'); - if (cfId != null) - builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-'); - builder.append(cacheType); - builder.append((version == null ? "" : "-" + version + ".db")); - return new File(conf.saved_caches_directory, builder.toString()); + public static File getSerializedCachePath(CacheService.CacheType cacheType, String version) + { + String name = cacheType.toString() + + (version == null ? "" : "-" + version + ".db"); + return new File(conf.saved_caches_directory, name); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 8e9802f..fada670 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.service.MigrationManager; @@ -129,6 +130,53 @@ public class Schema return keyspaceInstances.get(keyspaceName); } + /** + * Retrieve a CFS by name even if that CFS is an index + * + * An index is identified by looking for '.' in the CF name and separating to find the base table + * containing the index + * @param ksNameAndCFName + * @return The named CFS or null if the keyspace, base table, or index don't exist + */ + public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair ksNameAndCFName) { + String ksName = ksNameAndCFName.left; + String cfName = ksNameAndCFName.right; + Pair baseTable; + + /* + * Split does special case a one character regex, and it looks like it can detect + * if you use two characters to escape '.', but it still allocates a useless array. + */ + int indexOfSeparator = cfName.indexOf('.'); + if (indexOfSeparator > -1) + baseTable = Pair.create(ksName, cfName.substring(0, indexOfSeparator)); + else + baseTable = ksNameAndCFName; + + UUID cfId = cfIdMap.get(baseTable); + if (cfId == null) + return null; + + Keyspace ks = keyspaceInstances.get(ksName); + if (ks == null) + return null; + + ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId); + + //Not an index + if (indexOfSeparator == -1) + return baseCFS; + + if (baseCFS == null) + return null; + + SecondaryIndex index = baseCFS.indexManager.getIndexByName(cfName); + if (index == null) + return null; + + return index.getIndexCfs(); + } + public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId) { Pair pair = cfIdMap.inverse().get(cfId); @@ -302,12 +350,12 @@ public class Schema } /** - * @param cfId The identifier of the ColumnFamily to lookup - * @return true if the CF id is a known one, false otherwise. + * @param ksAndCFName The identifier of the ColumnFamily to lookup + * @return true if the KS and CF pair is a known one, false otherwise. */ - public boolean hasCF(UUID cfId) + public boolean hasCF(Pair ksAndCFName) { - return cfIdMap.containsValue(cfId); + return cfIdMap.containsKey(ksAndCFName); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 25b3e57..ffaa276 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -358,8 +358,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; - CachingOptions caching = metadata.getCaching(); - logger.info("Initializing {}.{}", keyspace.getName(), name); // scan for sstables corresponding to this cf and load them @@ -372,9 +370,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.addInitialSSTables(sstables); } - if (caching.keyCache.isEnabled()) - CacheService.instance.keyCache.loadSaved(this); - // compaction strategy should be created after the CFS has been prepared this.compactionStrategyWrapper = new WrappingCompactionStrategy(this); @@ -632,7 +627,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map unfinishedCompactions) { Directories directories = new Directories(metadata); - Set allGenerations = new HashSet<>(); for (Descriptor desc : directories.sstableLister().list().keySet()) allGenerations.add(desc.generation); @@ -702,39 +696,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - // must be called after all sstables are loaded since row cache merges all row versions - public void initRowCache() - { - if (!isRowCacheEnabled()) - return; - - long start = System.nanoTime(); - - int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this); - if (cachedRowsRead > 0) - logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), - cachedRowsRead, - keyspace.getName(), - name); - } - - public void initCounterCache() - { - if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0) - return; - - long start = System.nanoTime(); - - int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this); - if (cachedShardsRead > 0) - logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), - cachedShardsRead, - keyspace.getName(), - name); - } - /** * See #{@code StorageService.loadNewSSTables(String, String)} for more info * @@ -1246,7 +1207,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!isRowCacheEnabled()) return; - RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key); + RowCacheKey cacheKey = new RowCacheKey(metadata.ksAndCFName, key); invalidateCachedRow(cacheKey); } @@ -1696,7 +1657,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean assert isRowCacheEnabled() : String.format("Row cache is not enabled on column family [" + name + "]"); - RowCacheKey key = new RowCacheKey(cfId, filter.key); + RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key); // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 @@ -2068,7 +2029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) { DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); - if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedRow(dk); } @@ -2077,7 +2038,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet()) { DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); - if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } } @@ -2527,16 +2488,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!isRowCacheEnabled()) return null; - IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key)); + IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key)); return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached; } private void invalidateCaches() { - CacheService.instance.invalidateKeyCacheForCf(metadata.cfId); - CacheService.instance.invalidateRowCacheForCf(metadata.cfId); + CacheService.instance.invalidateKeyCacheForCf(metadata.ksAndCFName); + CacheService.instance.invalidateRowCacheForCf(metadata.ksAndCFName); if (metadata.isCounter()) - CacheService.instance.invalidateCounterCacheForCf(metadata.cfId); + CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName); } /** @@ -2544,7 +2505,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public boolean containsCachedRow(DecoratedKey key) { - return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key)); + return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key)); } public void invalidateCachedRow(RowCacheKey key) @@ -2558,21 +2519,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (cfId == null) return; // secondary index - invalidateCachedRow(new RowCacheKey(cfId, key)); + invalidateCachedRow(new RowCacheKey(metadata.ksAndCFName, key)); } public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return null; - return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName)); + return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName)); } public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return; - CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount); + CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName), clockAndCount); } public void forceMajorCompaction() throws InterruptedException, ExecutionException @@ -3008,11 +2969,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable(); } - private boolean isRowCacheEnabled() + public boolean isRowCacheEnabled() { return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; } + public boolean isCounterCacheEnabled() + { + return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0; + } + + public boolean isKeyCacheEnabled() + { + return metadata.getCaching().keyCache.isEnabled() && CacheService.instance.keyCache.getCapacity() > 0; + } + /** * Discard all SSTables that were created before given timestamp. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 4f59c40..03c3d2b 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -120,10 +120,6 @@ public class Keyspace // open and store the keyspace keyspaceInstance = new Keyspace(keyspaceName, loadSSTables); schema.storeKeyspaceInstance(keyspaceInstance); - - // keyspace has to be constructed and in the cache before cacheRow can be called - for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores()) - cfs.initRowCache(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 01035c4..77b745c 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -142,7 +142,7 @@ public class RowIndexEntry implements IMeasurableMemory skipPromotedIndex(in); } - public static void skipPromotedIndex(DataInput in) throws IOException + private static void skipPromotedIndex(DataInput in) throws IOException { int size = in.readInt(); if (size <= 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index a9ae069..ad3aae8 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -95,7 +95,8 @@ public class SecondaryIndexManager /** * Keeps all secondary index instances, either per-column or per-row */ - private final Set allIndexes; + private final Collection allIndexes; + private final Map indexesByName; /** @@ -107,7 +108,8 @@ public class SecondaryIndexManager { indexesByColumn = new ConcurrentSkipListMap<>(); rowLevelIndexMap = new ConcurrentHashMap<>(); - allIndexes = Collections.newSetFromMap(new ConcurrentHashMap()); + indexesByName = new ConcurrentHashMap(); + allIndexes = indexesByName.values(); this.baseCfs = baseCfs; } @@ -158,7 +160,7 @@ public class SecondaryIndexManager { idxNames = filterByColumn(idxNames); if (idxNames.isEmpty()) - return; + return; logger.info(String.format("Submitting index build of %s for data in %s", idxNames, StringUtils.join(sstables, ", "))); @@ -172,7 +174,7 @@ public class SecondaryIndexManager logger.info("Index build of {} complete", idxNames); } - public boolean indexes(CellName name, Set indexes) + public boolean indexes(CellName name, Collection indexes) { boolean matching = false; for (SecondaryIndex index : indexes) @@ -186,7 +188,7 @@ public class SecondaryIndexManager return matching; } - public Set indexFor(CellName name, Set indexes) + public Set indexFor(CellName name, Collection indexes) { Set matching = null; for (SecondaryIndex index : indexes) @@ -319,7 +321,7 @@ public class SecondaryIndexManager indexesByColumn.put(cdef.name.bytes, index); // Add to all indexes set: - allIndexes.add(index); + indexesByName.put(index.getIndexName(), index); // if we're just linking in the index to indexedColumns on an // already-built index post-restart, we're done @@ -422,11 +424,16 @@ public class SecondaryIndexManager /** * @return all of the secondary indexes without distinction to the (non-)backed by secondary ColumnFamilyStore. */ - public Set getIndexes() + public Collection getIndexes() { return allIndexes; } + public SecondaryIndex getIndexByName(String name) + { + return indexesByName.get(name); + } + /** * @return if there are ANY indexes for this table.. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 86f6a23..0f307b0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -1513,7 +1513,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted {}", cacheKey, info); keyCache.put(cacheKey, info); } public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) { - return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats); + return getCachedPosition(new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey()), updateStats); } private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) @@ -1596,7 +1596,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted ksAndCFName) { Iterator keyCacheIterator = keyCache.getKeySet().iterator(); while (keyCacheIterator.hasNext()) { KeyCacheKey key = keyCacheIterator.next(); - if (key.cfId.equals(cfId)) + if (key.ksAndCFName.equals(ksAndCFName)) keyCacheIterator.remove(); } } @@ -299,24 +298,24 @@ public class CacheService implements CacheServiceMBean rowCache.clear(); } - public void invalidateRowCacheForCf(UUID cfId) + public void invalidateRowCacheForCf(Pair ksAndCFName) { Iterator rowCacheIterator = rowCache.getKeySet().iterator(); while (rowCacheIterator.hasNext()) { RowCacheKey rowCacheKey = rowCacheIterator.next(); - if (rowCacheKey.cfId.equals(cfId)) + if (rowCacheKey.ksAndCFName.equals(ksAndCFName)) rowCacheIterator.remove(); } } - public void invalidateCounterCacheForCf(UUID cfId) + public void invalidateCounterCacheForCf(Pair ksAndCFName) { Iterator counterCacheIterator = counterCache.getKeySet().iterator(); while (counterCacheIterator.hasNext()) { CounterCacheKey counterCacheKey = counterCacheIterator.next(); - if (counterCacheKey.cfId.equals(cfId)) + if (counterCacheKey.ksAndCFName.equals(ksAndCFName)) counterCacheIterator.remove(); } } @@ -405,16 +404,24 @@ public class CacheService implements CacheServiceMBean public static class CounterCacheSerializer implements CacheSerializer { - public void serialize(CounterCacheKey key, DataOutputPlus out) throws IOException + public void serialize(CounterCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { + assert(cfs.metadata.isCounter()); + out.write(cfs.metadata.ksAndCFBytes); ByteBufferUtil.writeWithLength(key.partitionKey, out); ByteBufferUtil.writeWithLength(key.cellName, out); } public Future> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in); - final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in)); + ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in); + if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled()) + return null; + assert(cfs.metadata.isCounter()); + final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer); return StageManager.getStage(Stage.READ).submit(new Callable>() { public Pair call() throws Exception @@ -431,7 +438,7 @@ public class CacheService implements CacheServiceMBean if (cell == null || !cell.isLive(Long.MIN_VALUE)) return null; ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); - return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount); + return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount); } }); } @@ -439,14 +446,22 @@ public class CacheService implements CacheServiceMBean public static class RowCacheSerializer implements CacheSerializer { - public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException + public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { + assert(!cfs.isIndex()); + out.write(cfs.metadata.ksAndCFBytes); ByteBufferUtil.writeWithLength(key.key, out); } public Future> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + if (cfs == null || !cfs.isRowCacheEnabled()) + return null; + assert(!cfs.isIndex()); + return StageManager.getStage(Stage.READ).submit(new Callable>() { public Pair call() throws Exception @@ -454,7 +469,7 @@ public class CacheService implements CacheServiceMBean DecoratedKey key = cfs.partitioner.decorateKey(buffer); QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE); ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data); + return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data); } }); } @@ -462,24 +477,23 @@ public class CacheService implements CacheServiceMBean public static class KeyCacheSerializer implements CacheSerializer { - public void serialize(KeyCacheKey key, DataOutputPlus out) throws IOException + public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key); if (entry == null) return; - CFMetaData cfm = Schema.instance.getCFMetaData(key.cfId); - if (cfm == null) - return; // the table no longer exists. - + out.write(cfs.metadata.ksAndCFBytes); ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - cfm.comparator.rowIndexEntrySerializer().serialize(entry, out); + cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out); } public Future> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException { + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a + //parameter so they aren't deserialized here, even though they are serialized by this serializer int keyLength = input.readInt(); if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT) { @@ -488,15 +502,15 @@ public class CacheService implements CacheServiceMBean } ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); - SSTableReader reader = findDesc(generation, cfs.getSSTables()); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean - if (reader == null) + SSTableReader reader = null; + if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null) { - RowIndexEntry.Serializer.skipPromotedIndex(input); + RowIndexEntry.Serializer.skip(input); return null; } RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version); - return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); } private SSTableReader findDesc(int generation, Collection collection) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index d078203..17553f3 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -25,8 +25,10 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.server.RMIServerSocketFactory; +import java.util.List; import java.util.*; import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; @@ -36,6 +38,8 @@ import javax.management.remote.rmi.RMIConnectorServer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,7 +103,7 @@ public class CassandraDaemon url.append("service:jmx:"); url.append("rmi://localhost/jndi/"); url.append("rmi://localhost:").append(jmxPort).append("/jmxrmi"); - + Map env = new HashMap(); env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, serverFactory); @@ -144,7 +148,7 @@ public class CassandraDaemon */ protected void setup() { - try + try { logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName()); } @@ -330,11 +334,16 @@ public class CassandraDaemon } } - if (CacheService.instance.keyCache.size() > 0) - logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size()); - if (CacheService.instance.rowCache.size() > 0) - logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size()); + try + { + loadRowAndKeyCacheAsync().get(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Error loading key or row cache", t); + } try { @@ -429,6 +438,22 @@ public class CassandraDaemon completeSetup(); } + /* + * Asynchronously load the row and key cache in one off threads and return a compound future of the result. + * Error handling is pushed into the cache load since cache loads are allowed to fail and are handled by logging. + */ + private ListenableFuture loadRowAndKeyCacheAsync() + { + final ListenableFuture keyCacheLoad = CacheService.instance.keyCache.loadSavedAsync(); + + final ListenableFuture rowCacheLoad = CacheService.instance.rowCache.loadSavedAsync(); + + @SuppressWarnings("unchecked") + ListenableFuture> retval = Futures.successfulAsList(keyCacheLoad, rowCacheLoad); + + return retval; + } + @VisibleForTesting public void completeSetup() { @@ -533,7 +558,7 @@ public class CassandraDaemon logger.error("error registering MBean {}", MBEAN_NAME, e); //Allow the server to start even if the bean can't be registered } - + setup(); if (pidFile != null) @@ -625,15 +650,15 @@ public class CassandraDaemon { instance.activate(); } - + static class NativeAccess implements NativeAccessMBean { public boolean isAvailable() { return CLibrary.jnaAvailable(); } - - public boolean isMemoryLockable() + + public boolean isMemoryLockable() { return CLibrary.jnaMemoryLockable(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index f5950e3..431f163 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -719,10 +719,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE prepareToJoin(); - // Has to be called after the host id has potentially changed in prepareToJoin(). - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - if (cfs.metadata.isCounter()) - cfs.initCounterCache(); + try + { + CacheService.instance.counterCache.loadSavedAsync().get(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Error loading counter cache", t); + } if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) { @@ -2443,8 +2448,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /** * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified. - * - * + * + * * @param tag * the tag given to the snapshot; may not be null or empty * @param columnFamilyList @@ -3817,7 +3822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public synchronized void drain() throws IOException, InterruptedException, ExecutionException { inShutdownHook = true; - + ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) @@ -3947,32 +3952,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public LinkedHashMap effectiveOwnership(String keyspace) throws IllegalStateException { - + if (keyspace != null) { Keyspace keyspaceInstance = Schema.instance.getKeyspaceInstance(keyspace); if(keyspaceInstance == null) throw new IllegalArgumentException("The keyspace " + keyspace + ", does not exist"); - + if(keyspaceInstance.getReplicationStrategy() instanceof LocalStrategy) throw new IllegalStateException("Ownership values for keyspaces with LocalStrategy are meaningless"); } else { List nonSystemKeyspaces = Schema.instance.getNonSystemKeyspaces(); - + //system_traces is a non-system keyspace however it needs to be counted as one for this process int specialTableCount = 0; if (nonSystemKeyspaces.contains("system_traces")) { specialTableCount += 1; } - if (nonSystemKeyspaces.size() > specialTableCount) + if (nonSystemKeyspaces.size() > specialTableCount) throw new IllegalStateException("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless"); - + keyspace = "system_traces"; } - + TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); Collection> endpointsGroupedByDc = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index f866610..8d8dd22 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -713,4 +713,20 @@ public class FBUtilities digest.update((byte) ((val >>> 8) & 0xFF)); digest.update((byte) ((val >>> 0) & 0xFF)); } + + public static byte[] toWriteUTFBytes(String s) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeUTF(s); + dos.flush(); + return baos.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java index 28afef1..e6ef69e 100644 --- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java +++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java @@ -59,9 +59,8 @@ public class AutoSavingCacheTest extends SchemaLoader Assert.assertEquals(0, keyCache.size()); // then load saved - keyCache.loadSaved(cfs); - Assert.assertEquals(2, keyCache.size()); + keyCache.loadSavedAsync().get(); for (SSTableReader sstable : cfs.getSSTables()) - Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1")))); + Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1")))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 71d4f80..63f89a4 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.cache; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.cassandra.cache; * under the License. * */ +package org.apache.cassandra.cache; import java.nio.ByteBuffer; @@ -31,6 +31,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.utils.Pair; import com.googlecode.concurrentlinkedhashmap.Weighers; @@ -114,21 +115,20 @@ public class CacheProviderTest extends SchemaLoader simpleCase(cf, cache); concurrentCase(cf, cache); } - + @Test public void testKeys() { - UUID cfId = UUID.randomUUID(); - + Pair ksAndCFName = Pair.create(keyspaceName, cfName); byte[] b1 = {1, 2, 3, 4}; - RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1)); + RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1)); byte[] b2 = {1, 2, 3, 4}; - RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2)); + RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2)); assertEquals(key1, key2); assertEquals(key1.hashCode(), key2.hashCode()); - + byte[] b3 = {1, 2, 3, 5}; - RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3)); + RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3)); assertNotSame(key1, key3); assertNotSame(key1.hashCode(), key3.hashCode()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java new file mode 100644 index 0000000..0e879e9 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.MetricName; + +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.metrics.CacheMetrics; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class KeyCacheCqlTest extends CQLTester +{ + + static final String commonColumnsDef = + "part_key_a int," + + "part_key_b text," + + "clust_key_a int," + + "clust_key_b text," + + "clust_key_c frozen>," + // to make it really big + "col_text text," + + "col_int int," + + "col_long bigint,"; + static final String commonColumns = + "part_key_a," + + "part_key_b," + + "clust_key_a," + + "clust_key_b," + + "clust_key_c," + // to make it really big + "col_text," + + "col_int," + + "col_long"; + + @Test + public void test2iKeyCachePaths() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + createIndex("CREATE INDEX some_index ON %s (col_int)"); + insertData(table, "some_index", true); + clearCache(); + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + + long hits = metrics.hits.count(); + long requests = metrics.requests.count(); + assertEquals(4900, hits); + assertEquals(5250, requests); + + // + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); + hits = metrics.hits.count(); + requests = metrics.requests.count(); + assertEquals(10000, hits); + assertEquals(10500, requests); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + int beforeSize = CacheService.instance.keyCache.size(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(beforeSize, CacheService.instance.keyCache.size()); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths + //are defensive and unreachable + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar"))); + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar"))); + + dropTable("DROP TABLE %s"); + + //Test loading for a dropped 2i/table + CacheService.instance.keyCache.clear(); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(0, CacheService.instance.keyCache.size()); + } + + @Test + public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + createIndex("CREATE INDEX some_index ON %s (col_int)"); + insertData(table, "some_index", true); + clearCache(); + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + + long hits = metrics.hits.count(); + long requests = metrics.requests.count(); + assertEquals(4900, hits); + assertEquals(5250, requests); + + // + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); + hits = metrics.hits.count(); + requests = metrics.requests.count(); + assertEquals(10000, hits); + assertEquals(10500, requests); + + dropTable("DROP TABLE %s"); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet()) + { + Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE")); + Assert.assertFalse(key.ksAndCFName.right.startsWith(table)); + } + } + + // Inserts 100 partitions split over 10 sstables (flush after 10 partitions). + // Clustered tables receive 50 CQL rows per partition. + private void insertData(String table, String index, boolean withClustering) throws Throwable + { + StorageService.instance.disableAutoCompaction(KEYSPACE, table); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get(); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking(); + if (index != null) + { + StorageService.instance.disableAutoCompaction(KEYSPACE, table + '.' + index); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush(); + } + + for (int i = 0; i < 100; i++) + { + int partKeyA = i; + String partKeyB = Integer.toOctalString(i); + for (int c = 0; c < (withClustering ? 50 : 1); c++) + { + int clustKeyA = c; + String clustKeyB = Integer.toOctalString(c); + List clustKeyC = makeList(clustKeyB); + String colText = String.valueOf(i) + '-' + String.valueOf(c); + int colInt = i % 10; + long colLong = c; + execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + partKeyA, partKeyB, + clustKeyA, clustKeyB, clustKeyC, + colText, colInt, colLong); + } + + if (i % 10 == 9) + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get(); + if (index != null) + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush(); + } + } + } + + private static List makeList(String value) + { + List list = new ArrayList<>(50); + for (int i = 0; i < 50; i++) + { + list.add(value + i); + } + return list; + } + + private static void clearCache() + { + for (MetricName name : ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet())) + { + Metrics.defaultRegistry().removeMetric(name); + } + CacheService.instance.keyCache.clear(); + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + Assert.assertEquals(0, metrics.entries.value().intValue()); + Assert.assertEquals(0L, metrics.hits.count()); + Assert.assertEquals(0L, metrics.requests.count()); + Assert.assertEquals(0L, metrics.size.value().longValue()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/CounterCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java index cb2d97a..20e067c 100644 --- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java @@ -23,6 +23,7 @@ import org.junit.AfterClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.FBUtilities; @@ -48,6 +49,7 @@ public class CounterCacheTest extends SchemaLoader public void testReadWrite() { ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); assertEquals(0, CacheService.instance.counterCache.size()); @@ -72,6 +74,7 @@ public class CounterCacheTest extends SchemaLoader public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException { ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); @@ -86,11 +89,76 @@ public class CounterCacheTest extends SchemaLoader assertEquals(0, CacheService.instance.counterCache.size()); // load from cache and validate - CacheService.instance.counterCache.loadSaved(cfs); + CacheService.instance.counterCache.loadSaved(); assertEquals(4, CacheService.instance.counterCache.size()); assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1))); assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2))); assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1))); assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2))); } + + @Test + public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { + ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); + cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); + new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); + new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + + Keyspace ks = Schema.instance.removeKeyspaceInstance(KS); + + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + Schema.instance.storeKeyspaceInstance(ks); + } + } + + @Test + public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { + ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); + cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); + new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); + new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + + + CacheService.instance.setCounterCacheCapacityInMB(0); + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + CacheService.instance.setCounterCacheCapacityInMB(1); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index 1f7024e..4a4c7d5 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -86,7 +86,7 @@ public class KeyCacheTest extends SchemaLoader CacheService.instance.invalidateKeyCache(); assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2); - CacheService.instance.keyCache.loadSaved(store); + CacheService.instance.keyCache.loadSaved(); assertKeyCacheSize(savedMap.size(), KEYSPACE1, COLUMN_FAMILY2); // probably it's better to add equals/hashCode to RowIndexEntry...