cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/6] git commit: Fix race between writes and read for cache
Date Tue, 28 Feb 2012 17:59:04 GMT
Fix race between writes and read for cache

patch by jbellis and slebresne; reviewed by jbellis and slebresne for CASSANDRA-3862


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9270f4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9270f4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9270f4e

Branch: refs/heads/cassandra-1.1
Commit: c9270f4e3ae5f94d46070f1c7e585c90bc68df7c
Parents: aa75168
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Feb 28 18:53:32 2012 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Feb 28 18:53:32 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/cache/ConcurrentLinkedHashCache.java |   10 ++
 .../cache/ConcurrentLinkedHashCacheProvider.java   |   13 +-
 src/java/org/apache/cassandra/cache/ICache.java    |    4 +
 .../org/apache/cassandra/cache/IRowCacheEntry.java |    5 +
 .../apache/cassandra/cache/IRowCacheProvider.java  |    2 +-
 .../apache/cassandra/cache/InstrumentingCache.java |   10 ++
 .../apache/cassandra/cache/RowCacheSentinel.java   |   45 ++++++
 .../apache/cassandra/cache/SerializingCache.java   |   35 ++++-
 .../cassandra/cache/SerializingCacheProvider.java  |   47 ++++++-
 src/java/org/apache/cassandra/db/ColumnFamily.java |    9 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  119 +++++++++++----
 .../apache/cassandra/db/RowIteratorFactory.java    |    2 +
 .../db/compaction/CompactionIterable.java          |    2 +-
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 .../org/apache/cassandra/service/CacheService.java |    8 +-
 .../cassandra/streaming/IncomingStreamReader.java  |    3 +-
 .../org/apache/cassandra/utils/StatusLogger.java   |    3 +-
 .../unit/org/apache/cassandra/db/RowCacheTest.java |    4 +-
 19 files changed, 264 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41316eb..b5b79a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
  * ignore deprecated KsDef/CfDef/ColumnDef fields in native schema (CASSANDRA-3963)
  * CLI to report when unsupported column_metadata pair was given (CASSANDRA-3959)
  * reincarnate removed and deprecated KsDef/CfDef attributes (CASSANDRA-3953)
+ * Fix race between writes and read for cache (CASSANDRA-3862)
 Merged from 1.0:
  * remove the wait on hint future during write (CASSANDRA-3870)
  * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
index 8f4d2f0..a1cf4ea 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
@@ -117,6 +117,16 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K,
V>
         map.put(key, value);
     }
 
+    public boolean putIfAbsent(K key, V value)
+    {
+        return map.putIfAbsent(key, value) == null;
+    }
+
+    public boolean replace(K key, V old, V value)
+    {
+        return map.replace(key, old, value);
+    }
+
     public void remove(K key)
     {
         map.remove(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
index 851d4c5..71babd6 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.cache;
  *
  */
 
-import org.apache.cassandra.db.ColumnFamily;
-
 import com.googlecode.concurrentlinkedhashmap.Weigher;
 import com.googlecode.concurrentlinkedhashmap.Weighers;
 
@@ -29,21 +27,20 @@ import org.github.jamm.MemoryMeter;
 
 public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider
 {
-    public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher)
+    public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher)
     {
         return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher
                                                             ? createMemoryWeigher()
-                                                            : Weighers.<ColumnFamily>singleton());
+                                                            : Weighers.<IRowCacheEntry>singleton());
     }
 
-    private static Weigher<ColumnFamily> createMemoryWeigher()
+    private static Weigher<IRowCacheEntry> createMemoryWeigher()
     {
-        return new Weigher<ColumnFamily>()
+        return new Weigher<IRowCacheEntry>()
         {
             final MemoryMeter meter = new MemoryMeter();
 
-            @Override
-            public int weightOf(ColumnFamily value)
+            public int weightOf(IRowCacheEntry value)
             {
                 return (int) Math.min(meter.measure(value), Integer.MAX_VALUE);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ICache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ICache.java b/src/java/org/apache/cassandra/cache/ICache.java
index 48e045c..5f8e00b 100644
--- a/src/java/org/apache/cassandra/cache/ICache.java
+++ b/src/java/org/apache/cassandra/cache/ICache.java
@@ -36,6 +36,10 @@ public interface ICache<K, V>
 
     public void put(K key, V value);
 
+    public boolean putIfAbsent(K key, V value);
+
+    public boolean replace(K key, V old, V value);
+
     public V get(K key);
 
     public void remove(K key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheEntry.java b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
new file mode 100644
index 0000000..7340428
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
@@ -0,0 +1,5 @@
+package org.apache.cassandra.cache;
+
+public interface IRowCacheEntry
+{
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
index 9209ced..9e1eb7c 100644
--- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
@@ -27,5 +27,5 @@ import org.apache.cassandra.db.ColumnFamily;
  */
 public interface IRowCacheProvider
 {
-    public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher);
+    public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/InstrumentingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
index 36630ac..b4d048f 100644
--- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java
+++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
@@ -45,6 +45,16 @@ public class InstrumentingCache<K, V>
         map.put(key, value);
     }
 
+    public boolean putIfAbsent(K key, V value)
+    {
+        return map.putIfAbsent(key, value);
+    }
+
+    public boolean replace(K key, V old, V value)
+    {
+        return map.replace(key, old, value);
+    }
+
     public V get(K key)
     {
         V v = map.get(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheSentinel.java b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
new file mode 100644
index 0000000..381160a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
@@ -0,0 +1,45 @@
+package org.apache.cassandra.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+
+/**
+ * A sentinel object for row caches.  See comments to getThroughCache and CASSANDRA-3862.
+ */
+public class RowCacheSentinel implements IRowCacheEntry
+{
+    private static final AtomicLong generator = new AtomicLong();
+
+    final long sentinelId;
+
+    public RowCacheSentinel()
+    {
+        sentinelId = generator.getAndIncrement();
+    }
+
+    RowCacheSentinel(long sentinelId)
+    {
+        this.sentinelId = sentinelId;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof RowCacheSentinel)) return false;
+
+        RowCacheSentinel other = (RowCacheSentinel) o;
+        return this.sentinelId == other.sentinelId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(sentinelId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index b8844cb..4946fb0 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -76,7 +76,6 @@ public class SerializingCache<K, V> implements ICache<K, V>
     {
         return new Weigher<FreeableMemory>()
         {
-            @Override
             public int weightOf(FreeableMemory value)
             {
                 return (int) Math.min(value.size(), Integer.MAX_VALUE);
@@ -182,6 +181,40 @@ public class SerializingCache<K, V> implements ICache<K, V>
             old.unreference();
     }
 
+    public boolean putIfAbsent(K key, V value)
+    {
+        FreeableMemory mem = serialize(value);
+        if (mem == null)
+            return false; // out of memory.  never mind.
+
+        FreeableMemory old = map.putIfAbsent(key, mem);
+        if (old != null)
+            // the new value was not put, we've uselessly allocated some memory, free it
+            mem.unreference();
+        return old == null;
+    }
+
+    public boolean replace(K key, V oldToReplace, V value)
+    {
+        // if there is no old value in our map, we fail
+        FreeableMemory old = map.get(key);
+        if (old == null)
+            return false;
+
+        // see if the old value matches the one we want to replace
+        FreeableMemory mem = serialize(value);
+        if (mem == null)
+            return false; // out of memory.  never mind.
+        V oldValue = deserialize(old);
+        boolean success = oldValue.equals(oldToReplace) && map.replace(key, old,
mem);
+
+        if (success)
+            old.unreference();
+        else
+            mem.unreference();
+        return success;
+    }
+
     public void remove(K key)
     {
         FreeableMemory mem = map.remove(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index f71684b..3a06d36 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -20,12 +20,55 @@ package org.apache.cassandra.cache;
  *
  */
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.io.ISerializer;
 
 public class SerializingCacheProvider implements IRowCacheProvider
 {
-    public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher)
+    public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher)
+    {
+        return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher,
new RowCacheSerializer());
+    }
+
+    private static class RowCacheSerializer implements ISerializer<IRowCacheEntry>
     {
-        return new SerializingCache<RowCacheKey, ColumnFamily>(capacity, useMemoryWeigher,
ColumnFamily.serializer());
+        public void serialize(IRowCacheEntry cf, DataOutput out)
+        {
+            assert cf != null; // unlike CFS we don't support nulls, since there is no need
for that in the cache
+            try
+            {
+                out.writeBoolean(cf instanceof RowCacheSentinel);
+                if (cf instanceof RowCacheSentinel)
+                    out.writeLong(((RowCacheSentinel) cf).sentinelId);
+                else
+                    ColumnFamily.serializer.serialize((ColumnFamily) cf, out);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public IRowCacheEntry deserialize(DataInput in) throws IOException
+        {
+            boolean isSentinel = in.readBoolean();
+            if (isSentinel)
+                return new RowCacheSentinel(in.readLong());
+            return ColumnFamily.serializer.deserialize(in);
+        }
+
+        public long serializedSize(IRowCacheEntry cf)
+        {
+            return DBConstants.boolSize
+                   + (cf instanceof RowCacheSentinel
+                      ? DBConstants.intSize + DBConstants.longSize
+                      : ColumnFamily.serializer().serializedSize((ColumnFamily) cf));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 9191df6..740a0a6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
+import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -36,10 +37,10 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.HeapAllocator;
 
-public class ColumnFamily extends AbstractColumnContainer
+public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEntry
 {
-    /* The column serializer for this Column Family. Create based on config. */
-    private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
+    public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
+
     private final CFMetaData cfm;
 
     public static ColumnFamilySerializer serializer()
@@ -77,7 +78,7 @@ public class ColumnFamily extends AbstractColumnContainer
         return new ColumnFamily(cfm, factory.create(cfm.comparator, reversedInsertOrder));
     }
 
-    private ColumnFamily(CFMetaData cfm, ISortedColumns map)
+    protected ColumnFamily(CFMetaData cfm, ISortedColumns map)
     {
         super(map);
         assert cfm != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 165b150..a8d1fc8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -28,14 +30,17 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import javax.management.*;
 
-import com.google.common.collect.*;
-
-import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -54,9 +59,11 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.*;
@@ -386,13 +393,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         long start = System.currentTimeMillis();
 
-        AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache;
+        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache;
 
         // results are sorted on read (via treeset) because there are few reads and many
writes and reads only happen at startup
         int cachedRowsRead = 0;
         for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily))
         {
-            cacheRow(metadata.cfId, key);
+            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new
QueryPath(columnFamily)),
+                                                   Integer.MIN_VALUE,
+                                                   true);
+            CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, key), data);
         }
 
         if (cachedRowsRead > 0)
@@ -708,15 +718,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
 
+        // always invalidate a copying cache value
         if (CacheService.instance.rowCache.isPutCopying())
         {
             invalidateCachedRow(cacheKey);
+            return;
         }
-        else
+
+        // invalidate a normal cache value if it's a sentinel, so the read will retry (and
include the new update)
+        IRowCacheEntry cachedRow = getCachedRowInternal(cacheKey);
+        if (cachedRow != null)
         {
-            ColumnFamily cachedRow = getRawCachedRow(cacheKey);
-            if (cachedRow != null)
-                cachedRow.addAll(columnFamily, HeapAllocator.instance);
+            if (cachedRow instanceof RowCacheSentinel)
+                invalidateCachedRow(cacheKey);
+            else
+                ((ColumnFamily) cachedRow).addAll(columnFamily, HeapAllocator.instance);
         }
     }
 
@@ -1088,30 +1104,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
     }
 
-    public ColumnFamily cacheRow(Integer cfId, DecoratedKey decoratedKey)
+    /**
+     * fetch the row given by filter.key if it is in the cache; if not, read it from disk
and cache it
+     * @param cfId the column family to read the row from
+     * @param filter the columns being queried.  Note that we still cache entire rows, but
if a row is uncached
+     *               and we race to cache it, only the winner will read the entire row
+     * @return the entire row for filter.key, if present in the cache (or we can cache it),
or just the column
+     *         specified by filter otherwise
+     */
+    private ColumnFamily getThroughCache(Integer cfId, QueryFilter filter)
     {
         assert isRowCacheEnabled()
-                : String.format("Row cache is not enabled on column family [" + getColumnFamilyName()
+ "]");
+               : String.format("Row cache is not enabled on column family [" + getColumnFamilyName()
+ "]");
 
-        RowCacheKey key = new RowCacheKey(cfId, decoratedKey);
+        RowCacheKey key = new RowCacheKey(cfId, filter.key);
 
-        ColumnFamily cached;
-
-        if ((cached = CacheService.instance.rowCache.get(key)) == null)
+        // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel,
we'll return our
+        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+        if (cached != null)
         {
-            // We force ThreadSafeSortedColumns because cached row will be accessed concurrently
-            cached = getTopLevelColumns(QueryFilter.getIdentityFilter(decoratedKey, new QueryPath(columnFamily)),
-                                        Integer.MIN_VALUE,
-                                        true);
+            if (cached instanceof RowCacheSentinel)
+            {
+                // Some other read is trying to cache the value, just do a normal non-caching
read
+                return getTopLevelColumns(filter, Integer.MIN_VALUE, false);
+            }
+            return (ColumnFamily) cached;
+        }
 
-            if (cached == null)
-                return null;
+        RowCacheSentinel sentinel = new RowCacheSentinel();
+        boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
 
-            // avoid keeping a permanent reference to the original key buffer
-            CacheService.instance.rowCache.put(key, cached);
-        }
+        try
+        {
+            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key,
new QueryPath(columnFamily)),
+                                                   Integer.MIN_VALUE,
+                                                   true);
+            if (sentinelSuccess && data != null)
+                CacheService.instance.rowCache.replace(key, sentinel, data);
 
-        return cached;
+            return data;
+        }
+        finally
+        {
+            if (sentinelSuccess && data == null)
+                CacheService.instance.rowCache.remove(key);
+        }
     }
 
     ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
@@ -1137,7 +1175,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             if (cfId == null)
                 return null; // secondary index
 
-            ColumnFamily cached = cacheRow(cfId, filter.key);
+            ColumnFamily cached = getThroughCache(cfId, filter);
             if (cached == null)
                 return null;
 
@@ -1484,21 +1522,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getSSTables().size();
     }
 
-    /** raw cached row -- does not fetch the row if it is not present.  not counted in cache
statistics.  */
-
+    /**
+     * @return the cached row for @param key if it is already present in the cache.
+     * That is, unlike getThroughCache, it will not readAndCache the row if it is not present,
nor
+     * are these calls counted in cache statistics.
+     *
+     * Note that this WILL cause deserialization of a SerializingCache row, so if all you
+     * need to know is whether a row is present or not, use containsCachedRow instead.
+     */
     public ColumnFamily getRawCachedRow(DecoratedKey key)
     {
         if (metadata.cfId == null)
             return null; // secondary index
 
-        return getRawCachedRow(new RowCacheKey(metadata.cfId, key));
+        IRowCacheEntry cached = getCachedRowInternal(new RowCacheKey(metadata.cfId, key));
+        return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)
cached;
     }
 
-    public ColumnFamily getRawCachedRow(RowCacheKey key)
+    private IRowCacheEntry getCachedRowInternal(RowCacheKey key)
     {
         return CacheService.instance.rowCache.getCapacity() == 0 ? null : CacheService.instance.rowCache.getInternal(key);
     }
 
+    /**
+     * @return true if @param key is contained in the row cache
+     */
+    public boolean containsCachedRow(DecoratedKey key)
+    {
+        return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new
RowCacheKey(metadata.cfId, key));
+    }
+
     public void invalidateCachedRow(RowCacheKey key)
     {
         CacheService.instance.rowCache.remove(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 34fe07a..a31f6ec 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -103,8 +103,10 @@ public class RowIteratorFactory
                 // First check if this row is in the rowCache. If it is we can skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 if (cached == null)
+                {
                     // not cached: collate
                     filter.collateColumns(returnCF, colIters, gcBefore);
+                }
                 else
                 {
                     QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 270c3af..2fd0240 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -110,7 +110,7 @@ public class CompactionIterable extends AbstractCompactionIterable
                     // If the raw is cached, we call removeDeleted on it to have/ coherent
query returns. However it would look
                     // like some deleted columns lived longer than gc_grace + compaction.
This can also free up big amount of
                     // memory on long running instances
-                    controller.removeDeletedInCache(compactedRow.key);
+                    controller.invalidateCachedRow(compactedRow.key);
                 }
 
                 return compactedRow;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index dba8f55..9b67676 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -130,7 +130,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 // If the raw is cached, we call removeDeleted on it to have/ coherent query
returns. However it would look
                 // like some deleted columns lived longer than gc_grace + compaction. This
can also free up big amount of
                 // memory on long running instances
-                controller.removeDeletedInCache(compactedRow.key);
+                controller.invalidateCachedRow(compactedRow.key);
                 return compactedRow;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 8dd6bde..c70c45d 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -62,7 +62,7 @@ public class CacheService implements CacheServiceMBean
     public final static CacheService instance = new CacheService();
 
     public final AutoSavingCache<KeyCacheKey, Long> keyCache;
-    public final AutoSavingCache<RowCacheKey, ColumnFamily> rowCache;
+    public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache;
 
     private int rowCacheSavePeriod;
     private int keyCacheSavePeriod;
@@ -116,7 +116,7 @@ public class CacheService implements CacheServiceMBean
     /**
      * @return initialized row cache
      */
-    private AutoSavingCache<RowCacheKey, ColumnFamily> initRowCache()
+    private AutoSavingCache<RowCacheKey, IRowCacheEntry> initRowCache()
     {
         logger.info("Initializing row cache with capacity of {} MBs and provider {}",
                     DatabaseDescriptor.getRowCacheSizeInMB(),
@@ -125,8 +125,8 @@ public class CacheService implements CacheServiceMBean
         int rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 *
1024;
 
         // cache object
-        ICache<RowCacheKey, ColumnFamily> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity,
true);
-        AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = new AutoSavingCache<RowCacheKey,
ColumnFamily>(rc, CacheType.ROW_CACHE);
+        ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity,
true);
+        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey,
IRowCacheEntry>(rc, CacheType.ROW_CACHE);
 
         int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index f57b400..915d3bc 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -124,8 +124,7 @@ public class IncomingStreamReader
                     key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc,
ByteBufferUtil.readWithShortLength(in));
                     long dataSize = SSTableReader.readRowSize(in, localFile.desc);
 
-                    ColumnFamily cached = cfs.getRawCachedRow(key);
-                    if (cached != null && remoteFile.type == OperationType.AES &&
dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
+                    if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES
&& dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
                     {
                         // need to update row cache
                         // Note: Because we won't just echo the columns, there is no need
to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index 9d1ff68..1185315 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.db.ColumnFamily;
@@ -90,7 +91,7 @@ public class StatusLogger
 
         // Global key/row cache information
         AutoSavingCache<KeyCacheKey, Long> keyCache = CacheService.instance.keyCache;
-        AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache;
+        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache;
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
         int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 9bce4e8..da5cf63 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -66,7 +66,7 @@ public class RowCacheTest extends CleanupHelper
 
             cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER,
false, 1);
             assert CacheService.instance.rowCache.size() == i + 1;
-            assert cachedStore.getRawCachedRow(key) != null; // current key should be stored
in the cache
+            assert cachedStore.containsCachedRow(key); // current key should be stored in
the cache
 
             // checking if column is read correctly after cache
             ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
@@ -88,7 +88,7 @@ public class RowCacheTest extends CleanupHelper
             QueryPath path = new QueryPath(COLUMN_FAMILY, null, ByteBufferUtil.bytes("col"
+ i));
 
             cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER,
false, 1);
-            assert cachedStore.getRawCachedRow(key) != null; // cache should be populated
with the latest rows read (old ones should be popped)
+            assert cachedStore.containsCachedRow(key); // cache should be populated with
the latest rows read (old ones should be popped)
 
             // checking if column is read correctly after cache
             ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);


Mime
View raw message