cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1089078 - in /cassandra/trunk: interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/ca...
Date Tue, 05 Apr 2011 15:22:58 GMT
Author: jbellis
Date: Tue Apr  5 15:22:57 2011
New Revision: 1089078

URL: http://svn.apache.org/viewvc?rev=1089078&view=rev
Log:
introduce ICache, InstrumentingCache, IRowCacheProvider
patch by jbellis, Jon Hermes, and Vijay for CASSANDRA-1969

Added:
    cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
      - copied, changed from r1088806, cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCacheMBean.java
      - copied, changed from r1088806, cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java
    cassandra/trunk/test/unit/org/apache/cassandra/cache/
    cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
Removed:
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/RowPredicate.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java
Modified:
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
    cassandra/trunk/src/avro/internode.genavro
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingKeyCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingRowCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Apr  5 15:22:57 2011
@@ -403,6 +403,7 @@ struct CfDef {
     24: optional bool replicate_on_write=0,
     25: optional double merge_shards_chance,
     26: optional string key_validation_class,
+    27: optional string row_cache_provider="org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider",
 }
 
 /* describes a keyspace. */

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Apr  5 15:22:57 2011
@@ -74,6 +74,7 @@ public class CfDef implements TBase<CfDe
   private static final TField REPLICATE_ON_WRITE_FIELD_DESC = new TField("replicate_on_write", TType.BOOL, (short)24);
   private static final TField MERGE_SHARDS_CHANCE_FIELD_DESC = new TField("merge_shards_chance", TType.DOUBLE, (short)25);
   private static final TField KEY_VALIDATION_CLASS_FIELD_DESC = new TField("key_validation_class", TType.STRING, (short)26);
+  private static final TField ROW_CACHE_PROVIDER_FIELD_DESC = new TField("row_cache_provider", TType.STRING, (short)27);
 
   public String keyspace;
   public String name;
@@ -98,6 +99,7 @@ public class CfDef implements TBase<CfDe
   public boolean replicate_on_write;
   public double merge_shards_chance;
   public String key_validation_class;
+  public String row_cache_provider;
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements TFieldIdEnum {
@@ -123,7 +125,8 @@ public class CfDef implements TBase<CfDe
     MEMTABLE_OPERATIONS_IN_MILLIONS((short)23, "memtable_operations_in_millions"),
     REPLICATE_ON_WRITE((short)24, "replicate_on_write"),
     MERGE_SHARDS_CHANCE((short)25, "merge_shards_chance"),
-    KEY_VALIDATION_CLASS((short)26, "key_validation_class");
+    KEY_VALIDATION_CLASS((short)26, "key_validation_class"),
+    ROW_CACHE_PROVIDER((short)27, "row_cache_provider");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -184,6 +187,8 @@ public class CfDef implements TBase<CfDe
           return MERGE_SHARDS_CHANCE;
         case 26: // KEY_VALIDATION_CLASS
           return KEY_VALIDATION_CLASS;
+        case 27: // ROW_CACHE_PROVIDER
+          return ROW_CACHE_PROVIDER;
         default:
           return null;
       }
@@ -290,6 +295,8 @@ public class CfDef implements TBase<CfDe
         new FieldValueMetaData(TType.DOUBLE)));
     tmpMap.put(_Fields.KEY_VALIDATION_CLASS, new FieldMetaData("key_validation_class", TFieldRequirementType.OPTIONAL, 
         new FieldValueMetaData(TType.STRING)));
+    tmpMap.put(_Fields.ROW_CACHE_PROVIDER, new FieldMetaData("row_cache_provider", TFieldRequirementType.OPTIONAL, 
+        new FieldValueMetaData(TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap);
   }
@@ -307,6 +314,8 @@ public class CfDef implements TBase<CfDe
 
     this.replicate_on_write = false;
 
+    this.row_cache_provider = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
+
   }
 
   public CfDef(
@@ -369,6 +378,9 @@ public class CfDef implements TBase<CfDe
     if (other.isSetKey_validation_class()) {
       this.key_validation_class = other.key_validation_class;
     }
+    if (other.isSetRow_cache_provider()) {
+      this.row_cache_provider = other.row_cache_provider;
+    }
   }
 
   public CfDef deepCopy() {
@@ -416,6 +428,8 @@ public class CfDef implements TBase<CfDe
     setMerge_shards_chanceIsSet(false);
     this.merge_shards_chance = 0.0;
     this.key_validation_class = null;
+    this.row_cache_provider = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
+
   }
 
   public String getKeyspace() {
@@ -971,6 +985,30 @@ public class CfDef implements TBase<CfDe
     }
   }
 
+  public String getRow_cache_provider() {
+    return this.row_cache_provider;
+  }
+
+  public CfDef setRow_cache_provider(String row_cache_provider) {
+    this.row_cache_provider = row_cache_provider;
+    return this;
+  }
+
+  public void unsetRow_cache_provider() {
+    this.row_cache_provider = null;
+  }
+
+  /** Returns true if field row_cache_provider is set (has been asigned a value) and false otherwise */
+  public boolean isSetRow_cache_provider() {
+    return this.row_cache_provider != null;
+  }
+
+  public void setRow_cache_providerIsSet(boolean value) {
+    if (!value) {
+      this.row_cache_provider = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case KEYSPACE:
@@ -1157,6 +1195,14 @@ public class CfDef implements TBase<CfDe
       }
       break;
 
+    case ROW_CACHE_PROVIDER:
+      if (value == null) {
+        unsetRow_cache_provider();
+      } else {
+        setRow_cache_provider((String)value);
+      }
+      break;
+
     }
   }
 
@@ -1231,6 +1277,9 @@ public class CfDef implements TBase<CfDe
     case KEY_VALIDATION_CLASS:
       return getKey_validation_class();
 
+    case ROW_CACHE_PROVIDER:
+      return getRow_cache_provider();
+
     }
     throw new IllegalStateException();
   }
@@ -1288,6 +1337,8 @@ public class CfDef implements TBase<CfDe
       return isSetMerge_shards_chance();
     case KEY_VALIDATION_CLASS:
       return isSetKey_validation_class();
+    case ROW_CACHE_PROVIDER:
+      return isSetRow_cache_provider();
     }
     throw new IllegalStateException();
   }
@@ -1512,6 +1563,15 @@ public class CfDef implements TBase<CfDe
         return false;
     }
 
+    boolean this_present_row_cache_provider = true && this.isSetRow_cache_provider();
+    boolean that_present_row_cache_provider = true && that.isSetRow_cache_provider();
+    if (this_present_row_cache_provider || that_present_row_cache_provider) {
+      if (!(this_present_row_cache_provider && that_present_row_cache_provider))
+        return false;
+      if (!this.row_cache_provider.equals(that.row_cache_provider))
+        return false;
+    }
+
     return true;
   }
 
@@ -1634,6 +1694,11 @@ public class CfDef implements TBase<CfDe
     if (present_key_validation_class)
       builder.append(key_validation_class);
 
+    boolean present_row_cache_provider = true && (isSetRow_cache_provider());
+    builder.append(present_row_cache_provider);
+    if (present_row_cache_provider)
+      builder.append(row_cache_provider);
+
     return builder.toHashCode();
   }
 
@@ -1875,6 +1940,16 @@ public class CfDef implements TBase<CfDe
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetRow_cache_provider()).compareTo(typedOther.isSetRow_cache_provider());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRow_cache_provider()) {
+      lastComparison = TBaseHelper.compareTo(this.row_cache_provider, typedOther.row_cache_provider);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -2078,6 +2153,13 @@ public class CfDef implements TBase<CfDe
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 27: // ROW_CACHE_PROVIDER
+          if (field.type == TType.STRING) {
+            this.row_cache_provider = iprot.readString();
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           TProtocolUtil.skip(iprot, field.type);
       }
@@ -2229,6 +2311,13 @@ public class CfDef implements TBase<CfDe
         oprot.writeFieldEnd();
       }
     }
+    if (this.row_cache_provider != null) {
+      if (isSetRow_cache_provider()) {
+        oprot.writeFieldBegin(ROW_CACHE_PROVIDER_FIELD_DESC);
+        oprot.writeString(this.row_cache_provider);
+        oprot.writeFieldEnd();
+      }
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -2407,6 +2496,16 @@ public class CfDef implements TBase<CfDe
       }
       first = false;
     }
+    if (isSetRow_cache_provider()) {
+      if (!first) sb.append(", ");
+      sb.append("row_cache_provider:");
+      if (this.row_cache_provider == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.row_cache_provider);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }

Modified: cassandra/trunk/src/avro/internode.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/avro/internode.genavro (original)
+++ cassandra/trunk/src/avro/internode.genavro Tue Apr  5 15:22:57 2011
@@ -63,6 +63,7 @@ protocol InterNode {
         union { null, double} merge_shards_chance = null;
         union { int, null } id;
         union { array<ColumnDef>, null } column_metadata;
+        union { string, null } row_cache_provider = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
     }
 
     @aliases(["org.apache.cassandra.config.avro.KsDef"])

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Apr  5 15:22:57 2011
@@ -42,7 +42,7 @@ import org.apache.cassandra.utils.ByteBu
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-public abstract class AutoSavingCache<K, V> extends JMXInstrumentedCache<K, V>
+public abstract class AutoSavingCache<K, V> extends InstrumentingCache<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
 
@@ -51,9 +51,9 @@ public abstract class AutoSavingCache<K,
     protected volatile ScheduledFuture<?> saveTask;
     protected final ColumnFamilyStore.CacheType cacheType;
     
-    public AutoSavingCache(String tableName, String cfName, ColumnFamilyStore.CacheType cacheType, int capacity)
+    public AutoSavingCache(ICache<K, V> cache, String tableName, String cfName, ColumnFamilyStore.CacheType cacheType)
     {
-        super(tableName, cfName + cacheType, capacity);
+        super(cache, tableName, cfName + cacheType);
         this.tableName = tableName;
         this.cfName = cfName;
         this.cacheType = cacheType;
@@ -177,7 +177,7 @@ public abstract class AutoSavingCache<K,
     {
         if (getCapacity() > 0)
         {
-            int newCapacity = (int) (DatabaseDescriptor.getReduceCacheCapacityTo() * getSize());
+            int newCapacity = (int) (DatabaseDescriptor.getReduceCacheCapacityTo() * size());
             logger.warn(String.format("Reducing %s %s capacity from %d to %s to reduce memory pressure",
                                       cfName, cacheType, getCapacity(), newCapacity));
             setCapacity(newCapacity);

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingKeyCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingKeyCache.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingKeyCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingKeyCache.java Tue Apr  5 15:22:57 2011
@@ -31,9 +31,9 @@ import org.apache.cassandra.utils.Pair;
 
 public class AutoSavingKeyCache<K extends Pair<Descriptor, DecoratedKey>, V> extends AutoSavingCache<K, V>
 {
-    public AutoSavingKeyCache(String tableName, String cfName, int capacity)
+    public AutoSavingKeyCache(ICache<K, V> cache, String tableName, String cfName)
     {
-        super(tableName, cfName, ColumnFamilyStore.CacheType.KEY_CACHE_TYPE, capacity);
+        super(cache, tableName, cfName, ColumnFamilyStore.CacheType.KEY_CACHE_TYPE);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingRowCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingRowCache.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingRowCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingRowCache.java Tue Apr  5 15:22:57 2011
@@ -29,9 +29,9 @@ import org.apache.cassandra.db.Decorated
 
 public class AutoSavingRowCache<K extends DecoratedKey, V> extends AutoSavingCache<K, V>
 {
-    public AutoSavingRowCache(String tableName, String cfName, int capacity)
+    public AutoSavingRowCache(ICache<K, V> cache, String tableName, String cfName)
     {
-        super(tableName, cfName, ColumnFamilyStore.CacheType.ROW_CACHE_TYPE, capacity);
+        super(cache, tableName, cfName, ColumnFamilyStore.CacheType.ROW_CACHE_TYPE);
     }
 
     @Override

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1089078&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Tue Apr  5 15:22:57 2011
@@ -0,0 +1,82 @@
+package org.apache.cassandra.cache;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
+/** Wrapper so CLHM can implement ICache interface.
+ *  (this is what you get for making library classes final.) */
+public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
+{
+    public static final int DEFAULT_CONCURENCY_LEVEL = 64;
+    private final ConcurrentLinkedHashMap<K, V> map;
+
+    public ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map)
+    {
+        this.map = map;
+    }
+
+    public static <K, V> ConcurrentLinkedHashCache<K, V> create(int capacity)
+    {
+        ConcurrentLinkedHashMap<K, V> map = new ConcurrentLinkedHashMap.Builder<K, V>()
+                                            .weigher(Weighers.<V>singleton())
+                                            .initialCapacity(capacity)
+                                            .maximumWeightedCapacity(capacity)
+                                            .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
+                                            .build();
+        return new ConcurrentLinkedHashCache<K, V>(map);
+    }
+
+    public void discard(K key)
+    {
+        remove(key);
+    }
+
+    public int capacity()
+    {
+        return map.capacity();
+    }
+
+    public void setCapacity(int capacity)
+    {
+        map.setCapacity(capacity);
+    }
+
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    public int size()
+    {
+        return map.size();
+    }
+
+    public void clear()
+    {
+        map.clear();
+    }
+
+    public V get(K key)
+    {
+        return map.get(key);
+    }
+
+    public void put(K key, V value)
+    {
+        map.put(key, value);
+    }
+
+    public void remove(K key)
+    {
+        map.remove(key);
+    }
+
+    public Set<K> keySet()
+    {
+        return map.keySet();
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java?rev=1089078&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java Tue Apr  5 15:22:57 2011
@@ -0,0 +1,13 @@
+package org.apache.cassandra.cache;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+
+public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider
+{
+    public ICache<DecoratedKey, ColumnFamily> create(int capacity)
+    {
+        return ConcurrentLinkedHashCache.create(capacity);
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1089078&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Tue Apr  5 15:22:57 2011
@@ -0,0 +1,27 @@
+package org.apache.cassandra.cache;
+
+import java.util.Set;
+
+/**
+ * This is similar to the Map interface, but requires maintaining a given capacity
+ * and does not require put or remove to return values, which lets SerializingCache
+ * be more efficient by avoiding deserialize except on get.
+ */
+public interface ICache<K, V>
+{
+    public int capacity();
+
+    public void setCapacity(int capacity);
+
+    public void put(K key, V value);
+
+    public V get(K key);
+
+    public void remove(K key);
+
+    public int size();
+
+    public void clear();
+
+    public Set<K> keySet();
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/IRowCacheProvider.java?rev=1089078&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/IRowCacheProvider.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/IRowCacheProvider.java Tue Apr  5 15:22:57 2011
@@ -0,0 +1,12 @@
+package org.apache.cassandra.cache;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+
+/**
+ * Provides cache objects with a requested capacity.
+ */
+public interface IRowCacheProvider
+{
+    public ICache<DecoratedKey, ColumnFamily> create(int capacity);
+}

Copied: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (from r1088806, cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?p2=cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java&p1=cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java&r1=1088806&r2=1089078&rev=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Tue Apr  5 15:22:57 2011
@@ -21,37 +21,41 @@ package org.apache.cassandra.cache;
  */
 
 
-import java.util.Map;
+import java.lang.management.ManagementFactory;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
-public class InstrumentedCache<K, V>
+/**
+ * Wraps an ICache in requests + hits tracking.
+ */
+public class InstrumentingCache<K, V> implements InstrumentingCacheMBean
 {
-    public static final int DEFAULT_CONCURENCY_LEVEL = 64;
-
-    private final ConcurrentLinkedHashMap<K, V> map;
     private final AtomicLong requests = new AtomicLong(0);
     private final AtomicLong hits = new AtomicLong(0);
     private final AtomicLong lastRequests = new AtomicLong(0);
     private final AtomicLong lastHits = new AtomicLong(0);
     private volatile boolean capacitySetManually;
+    private final ICache<K, V> map;
 
-    public InstrumentedCache(int capacity)
+    public InstrumentingCache(ICache<K, V> map, String table, String name)
     {
-        this(capacity, DEFAULT_CONCURENCY_LEVEL);
-    }
-
-    public InstrumentedCache(int capacity, int concurency)
-    {
-        map = new ConcurrentLinkedHashMap.Builder<K, V>()
-                .weigher(Weighers.<V>singleton())
-                .initialCapacity(capacity)
-                .maximumWeightedCapacity(capacity)
-                .concurrencyLevel(concurency)
-                .build();
+        this.map = map;
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=Caches,keyspace=" + table + ",cache=" + name);
+            // unregister any previous, as this may be a replacement.
+            if (mbs.isRegistered(mbeanName))
+                mbs.unregisterMBean(mbeanName);
+            mbs.registerMBean(this, mbeanName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public void put(K key, V value)
@@ -87,7 +91,7 @@ public class InstrumentedCache<K, V>
     {
         return capacitySetManually;
     }
-    
+
     public void updateCapacity(int capacity)
     {
         map.setCapacity(capacity);
@@ -99,7 +103,7 @@ public class InstrumentedCache<K, V>
         capacitySetManually = true;
     }
 
-    public int getSize()
+    public int size()
     {
         return map.size();
     }
@@ -140,9 +144,4 @@ public class InstrumentedCache<K, V>
     {
         return map.keySet();
     }
-
-    public Set<Map.Entry<K, V>> getEntrySet()
-    {
-        return map.entrySet();
-    }
 }

Copied: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCacheMBean.java (from r1088806, cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCacheMBean.java?p2=cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCacheMBean.java&p1=cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java&r1=1088806&r2=1089078&rev=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCacheMBean.java Tue Apr  5 15:22:57 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.cache;
  */
 
 
-public interface JMXInstrumentedCacheMBean
+public interface InstrumentingCacheMBean
 {
     public int getCapacity();
     public void setCapacity(int capacity);
-    public int getSize();
+    public int size();
 
     /** total request count since cache creation */
     public long getRequests();

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Apr  5 15:22:57 2011
@@ -1105,6 +1105,9 @@ public class CliClient extends CliUserHe
             case REPLICATE_ON_WRITE:
                 cfDef.setReplicate_on_write(Boolean.parseBoolean(mValue));
                 break;
+            case ROW_CACHE_PROVIDER:
+                cfDef.setRow_cache_provider(mValue);
+                break;
             default:
                 //must match one of the above or we'd throw an exception at the valueOf statement above.
                 assert(false);

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java Tue Apr  5 15:22:57 2011
@@ -50,6 +50,7 @@ public class CliUserHelp {
         MIN_COMPACTION_THRESHOLD,
         MAX_COMPACTION_THRESHOLD,
         REPLICATE_ON_WRITE,
+        ROW_CACHE_PROVIDER,
     }
 
     protected EnumMap<ColumnFamilyArgument, String> argumentExplanations = new EnumMap<ColumnFamilyArgument, String>(ColumnFamilyArgument.class)
@@ -70,6 +71,7 @@ public class CliUserHelp {
         put(ColumnFamilyArgument.MIN_COMPACTION_THRESHOLD, "Avoid minor compactions of less than this number of sstable files");
         put(ColumnFamilyArgument.MAX_COMPACTION_THRESHOLD, "Compact no more than this number of sstable files at once");
         put(ColumnFamilyArgument.REPLICATE_ON_WRITE, "Replicate every counter update from the leader to the follower replicas");
+        put(ColumnFamilyArgument.ROW_CACHE_PROVIDER, "Row cache provider, opions are SerializingProvider/ConcurrentLinkedHashCacheProvider");
     }};
     
     protected void printCmdHelp(Tree statement, CliSessionState state)

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Apr  5 15:22:57 2011
@@ -29,6 +29,9 @@ import org.apache.commons.lang.builder.H
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import org.apache.avro.util.Utf8;
+import org.apache.cassandra.cache.ConcurrentLinkedHashCache;
+import org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider;
+import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.db.migration.avro.ColumnDef;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.HintedHandOffManager;
@@ -41,6 +44,7 @@ import org.apache.cassandra.db.marshal.U
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 
@@ -60,7 +64,8 @@ public final class CFMetaData
     public final static int DEFAULT_MEMTABLE_THROUGHPUT_IN_MB = sizeMemtableThroughput();
     public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS = sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
     public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
-
+    public final static String DEFAULT_ROW_CACHE_PROVIDER = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
+    
     private static final int MIN_CF_ID = 1000;
     private static final AtomicInteger idGen = new AtomicInteger(MIN_CF_ID);
     
@@ -149,6 +154,7 @@ public final class CFMetaData
     private int memtableThroughputInMb;               // default based on heap size
     private double memtableOperationsInMillions;      // default based on throughput
     private double mergeShardsChance;                 // default 0.1, chance [0.0, 1.0] of merging old shards during replication
+    private String rowCacheProvider;
     // NOTE: if you find yourself adding members to this class, make sure you keep the convert methods in lockstep.
 
     private Map<ByteBuffer, ColumnDefinition> column_metadata;
@@ -170,6 +176,7 @@ public final class CFMetaData
     public CFMetaData memOps(double prop) {memtableOperationsInMillions = prop; return this;}
     public CFMetaData mergeShardsChance(double prop) {mergeShardsChance = prop; return this;}
     public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {column_metadata = prop; return this;}
+    public CFMetaData rowCacheProvider(String prop) { rowCacheProvider = prop; return this;};
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType comp, AbstractType subcc)
     {
@@ -216,6 +223,7 @@ public final class CFMetaData
         memtableThroughputInMb       = DEFAULT_MEMTABLE_THROUGHPUT_IN_MB;
         memtableOperationsInMillions = DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS;
         mergeShardsChance            = DEFAULT_MERGE_SHARDS_CHANCE;
+        rowCacheProvider             = DEFAULT_ROW_CACHE_PROVIDER;
 
         // Defaults strange or simple enough to not need a DEFAULT_T for
         defaultValidator = BytesType.instance;
@@ -324,6 +332,7 @@ public final class CFMetaData
                                                     org.apache.cassandra.db.migration.avro.ColumnDef.SCHEMA$);
         for (ColumnDefinition cd : column_metadata.values())
             cf.column_metadata.add(cd.deflate());
+        cf.row_cache_provider = new Utf8(rowCacheProvider);
         return cf;
     }
 
@@ -338,9 +347,9 @@ public final class CFMetaData
         {
             comparator = DatabaseDescriptor.getComparator(cf.comparator_type.toString());
             if (cf.subcomparator_type != null)
-                subcolumnComparator = DatabaseDescriptor.getComparator(cf.subcomparator_type.toString());
-            validator = DatabaseDescriptor.getComparator(cf.default_validation_class.toString());
-            keyValidator = DatabaseDescriptor.getComparator(cf.key_validation_class.toString());
+                subcolumnComparator = DatabaseDescriptor.getComparator(cf.subcomparator_type);
+            validator = DatabaseDescriptor.getComparator(cf.default_validation_class);
+            keyValidator = DatabaseDescriptor.getComparator(cf.key_validation_class);
         }
         catch (Exception ex)
         {
@@ -372,6 +381,7 @@ public final class CFMetaData
         if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); }
         if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); }
         if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); }
+        if (cf.row_cache_provider != null) { newCFMD.rowCacheProvider(cf.row_cache_provider.toString()); }
 
         return newCFMD.comment(cf.comment.toString())
                       .rowCacheSize(cf.row_cache_size)
@@ -464,6 +474,11 @@ public final class CFMetaData
         return memtableOperationsInMillions;
     }
 
+    public IRowCacheProvider getRowCacheProvider()
+    {
+        return FBUtilities.newCacheProvider(rowCacheProvider);
+    }
+
     public Map<ByteBuffer, ColumnDefinition> getColumn_metadata()
     {
         return Collections.unmodifiableMap(column_metadata);
@@ -577,6 +592,8 @@ public final class CFMetaData
             cf_def.setMemtable_operations_in_millions(CFMetaData.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS);
         if (!cf_def.isSetMerge_shards_chance())
             cf_def.setMerge_shards_chance(CFMetaData.DEFAULT_MERGE_SHARDS_CHANCE);
+        if (!cf_def.isSetRow_cache_provider())
+            cf_def.setRow_cache_provider(CFMetaData.DEFAULT_ROW_CACHE_PROVIDER);
     }
     
     // merges some final fields from this CFM with modifiable fields from CfDef into a new CFMetaData.
@@ -621,6 +638,7 @@ public final class CFMetaData
         memtableThroughputInMb = cf_def.memtable_throughput_in_mb;
         memtableOperationsInMillions = cf_def.memtable_operations_in_millions;
         mergeShardsChance = cf_def.merge_shards_chance;
+        rowCacheProvider = cf_def.row_cache_provider.toString();
         
         // adjust secondary indexes. figure out who is coming and going.
         Set<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
@@ -741,7 +759,8 @@ public final class CFMetaData
             tcd.validation_class = cd.validator.getClass().getName();
             column_meta.add(tcd);
         }
-        def.column_metadata = column_meta;   
+        def.column_metadata = column_meta; 
+        def.row_cache_provider = cfm.rowCacheProvider;
         return def;
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java Tue Apr  5 15:22:57 2011
@@ -54,6 +54,7 @@ public class CreateColumnFamilyStatement
     private static final String KW_MEMTABLESIZEINMB = "memtable_throughput_in_mb";
     private static final String KW_MEMTABLEOPSINMILLIONS = "memtable_operations_in_millions";
     private static final String KW_REPLICATEONWRITE = "replicate_on_write";
+    private static final String KW_ROW_CACHE_PROVIDER = "row_cache_provider";
     
     // Maps CQL short names to the respective Cassandra comparator/validator class names
     private static final Map<String, String> comparators = new HashMap<String, String>();
@@ -84,6 +85,7 @@ public class CreateColumnFamilyStatement
         keywords.add(KW_MEMTABLESIZEINMB);
         keywords.add(KW_MEMTABLEOPSINMILLIONS);
         keywords.add(KW_REPLICATEONWRITE);
+        keywords.add(KW_ROW_CACHE_PROVIDER);
     }
  
     private final String name;
@@ -254,7 +256,8 @@ public class CreateColumnFamilyStatement
                    .memOps(getPropertyDouble(KW_MEMTABLEOPSINMILLIONS, CFMetaData.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS))
                    .mergeShardsChance(0.0)
                    .columnMetadata(getColumns(comparator))
-                   .keyValidator(DatabaseDescriptor.getComparator(comparators.get(getKeyType())));
+                   .keyValidator(DatabaseDescriptor.getComparator(comparators.get(getKeyType())))
+                   .rowCacheProvider(getPropertyString(KW_ROW_CACHE_PROVIDER, CFMetaData.DEFAULT_ROW_CACHE_PROVIDER));
         }
         catch (ConfigurationException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Tue Apr  5 15:22:57 2011
@@ -120,7 +120,7 @@ public class Column implements IColumn
          * + 4 bytes which basically indicates the size of the byte array
          * + entire byte array.
         */
-        return DBConstants.shortSize_ + name.remaining() + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.remaining();
+        return DBConstants.shortSize_ + name.remaining() + 1 + DBConstants.tsSize_ + DBConstants.intSize_ + value.remaining();
     }
 
     /*

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Apr  5 15:22:57 2011
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.db;
 
+import static org.apache.cassandra.db.DBConstants.*;
+
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collection;
@@ -419,4 +421,16 @@ public class ColumnFamily implements ICo
     {
         return columns.values().iterator();
     }
+
+    public long serializedSize()
+    {
+        int size = boolSize_ // bool
+                 + intSize_ // id
+                 + intSize_ // local deletion time
+                 + longSize_ // client deltion time
+                 + intSize_; // column count
+        for (IColumn column : columns.values())
+            size += column.serializedSize();
+        return size;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Apr  5 15:22:57 2011
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.cache.AutoSavingKeyCache;
 import org.apache.cassandra.cache.AutoSavingRowCache;
+import org.apache.cassandra.cache.ConcurrentLinkedHashCache;
+import org.apache.cassandra.cache.ICache;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -238,8 +240,10 @@ public class ColumnFamilyStore implement
         if (logger.isDebugEnabled())
             logger.debug("Starting CFS {}", columnFamily);
 
-        keyCache = new AutoSavingKeyCache<Pair<Descriptor, DecoratedKey>, Long>(table.name, columnFamilyName, 0);
-        rowCache = new AutoSavingRowCache<DecoratedKey, ColumnFamily>(table.name, columnFamilyName, 0);
+        ICache<Pair<Descriptor, DecoratedKey>, Long> kc = ConcurrentLinkedHashCache.create(0);
+        keyCache = new AutoSavingKeyCache<Pair<Descriptor, DecoratedKey>, Long>(kc, table.name, columnFamilyName);
+        ICache<DecoratedKey, ColumnFamily> rc = metadata.getRowCacheProvider().create(0);        
+        rowCache = new AutoSavingRowCache<DecoratedKey, ColumnFamily>(rc, table.name, columnFamilyName);
 
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
@@ -495,10 +499,10 @@ public class ColumnFamilyStore implement
         // results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup
         for (DecoratedKey key : rowCache.readSaved())
             cacheRow(key);
-        if (rowCache.getSize() > 0)
+        if (rowCache.size() > 0)
             logger.info(String.format("completed loading (%d ms; %d keys) row cache for %s.%s",
                                       System.currentTimeMillis()-start,
-                                      rowCache.getSize(),
+                                      rowCache.size(),
                                       table.name,
                                       columnFamily));
 
@@ -735,6 +739,8 @@ public class ColumnFamilyStore implement
 
     /*
      * Insert/Update the column family for this key. param @ lock - lock that
+     * Caller is responsible for acquiring Table.flusherLock!
+     * param @ lock - lock that needs to be used.
      * needs to be used. param @ key - key for update/insert param @
      * columnFamily - columnFamily changes
      */
@@ -1644,12 +1650,12 @@ public class ColumnFamilyStore implement
 
     public int getRowCacheSize()
     {
-        return rowCache.getSize();
+        return rowCache.size();
     }
 
     public int getKeyCacheSize()
     {
-        return keyCache.getSize();
+        return keyCache.size();
     }
 
     public static Iterable<ColumnFamilyStore> all()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Apr  5 15:22:57 2011
@@ -30,7 +30,7 @@ import com.google.common.collect.Collect
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.InstrumentedCache;
+import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -114,7 +114,7 @@ public class SSTableReader extends SSTab
     private IndexSummary indexSummary;
     private Filter bf;
 
-    private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
+    private InstrumentingCache<Pair<Descriptor, DecoratedKey>, Long> keyCache;
 
     private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 
@@ -195,7 +195,7 @@ public class SSTableReader extends SSTab
             logger.debug("INDEX LOAD TIME for " + descriptor + ": " + (System.currentTimeMillis() - start) + " ms.");
 
         if (logger.isDebugEnabled() && sstable.getKeyCache() != null)
-            logger.debug(String.format("key cache contains %s/%s keys", sstable.getKeyCache().getSize(), sstable.getKeyCache().getCapacity()));
+            logger.debug(String.format("key cache contains %s/%s keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()));
 
         return sstable;
     }
@@ -279,8 +279,8 @@ public class SSTableReader extends SSTab
                                                                       true);
         try
         {
-            if (keyCache != null && keyCache.getCapacity() - keyCache.getSize() < keysToLoadInCache.size())
-                keyCache.updateCapacity(keyCache.getSize() + keysToLoadInCache.size());
+            if (keyCache != null && keyCache.getCapacity() - keyCache.size() < keysToLoadInCache.size())
+                keyCache.updateCapacity(keyCache.size() + keysToLoadInCache.size());
 
             long indexSize = input.length();
             long estimatedKeys = SSTable.estimateRowsFromIndex(input);
@@ -671,7 +671,7 @@ public class SSTableReader extends SSTab
         return bloomFilterTracker.getRecentTruePositiveCount();
     }
 
-    public InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache()
+    public InstrumentingCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache()
     {
         return keyCache;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Apr  5 15:22:57 2011
@@ -950,6 +950,7 @@ public class CassandraServer implements 
         if (cf_def.isSetMemtable_throughput_in_mb()) { newCFMD.memSize(cf_def.memtable_throughput_in_mb); }
         if (cf_def.isSetMemtable_operations_in_millions()) { newCFMD.memOps(cf_def.memtable_operations_in_millions); }
         if (cf_def.isSetMerge_shards_chance()) { newCFMD.mergeShardsChance(cf_def.merge_shards_chance); }
+        if (cf_def.isSetRow_cache_provider()) { newCFMD.rowCacheProvider(cf_def.row_cache_provider); }
 
         return newCFMD.comment(cf_def.comment)
                       .rowCacheSize(cf_def.row_cache_size)

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Apr  5 15:22:57 2011
@@ -35,7 +35,7 @@ import org.apache.cassandra.config.Confi
 
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
+import org.apache.cassandra.cache.InstrumentingCacheMBean;
 import org.apache.cassandra.concurrent.IExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.CompactionManagerMBean;
@@ -409,11 +409,11 @@ public class NodeCmd
                 outs.println("\t\tWrite Latency: " + String.format("%01.3f", cfstore.getRecentWriteLatencyMicros() / 1000) + " ms.");
                 outs.println("\t\tPending Tasks: " + cfstore.getPendingTasks());
 
-                JMXInstrumentedCacheMBean keyCacheMBean = probe.getKeyCacheMBean(tableName, cfstore.getColumnFamilyName());
+                InstrumentingCacheMBean keyCacheMBean = probe.getKeyCacheMBean(tableName, cfstore.getColumnFamilyName());
                 if (keyCacheMBean.getCapacity() > 0)
                 {
                     outs.println("\t\tKey cache capacity: " + keyCacheMBean.getCapacity());
-                    outs.println("\t\tKey cache size: " + keyCacheMBean.getSize());
+                    outs.println("\t\tKey cache size: " + keyCacheMBean.size());
                     outs.println("\t\tKey cache hit rate: " + keyCacheMBean.getRecentHitRate());
                 }
                 else
@@ -421,11 +421,11 @@ public class NodeCmd
                     outs.println("\t\tKey cache: disabled");
                 }
 
-                JMXInstrumentedCacheMBean rowCacheMBean = probe.getRowCacheMBean(tableName, cfstore.getColumnFamilyName());
+                InstrumentingCacheMBean rowCacheMBean = probe.getRowCacheMBean(tableName, cfstore.getColumnFamilyName());
                 if (rowCacheMBean.getCapacity() > 0)
                 {
                     outs.println("\t\tRow cache capacity: " + rowCacheMBean.getCapacity());
-                    outs.println("\t\tRow cache size: " + rowCacheMBean.getSize());
+                    outs.println("\t\tRow cache size: " + rowCacheMBean.size());
                     outs.println("\t\tRow cache hit rate: " + rowCacheMBean.getRecentHitRate());
                 }
                 else

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Apr  5 15:22:57 2011
@@ -40,7 +40,7 @@ import javax.management.remote.JMXServic
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
+import org.apache.cassandra.cache.InstrumentingCacheMBean;
 import org.apache.cassandra.concurrent.IExecutorMBean;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -265,12 +265,12 @@ public class NodeProbe
       return compactionProxy;
     }
 
-    public JMXInstrumentedCacheMBean getKeyCacheMBean(String tableName, String cfName)
+    public InstrumentingCacheMBean getKeyCacheMBean(String tableName, String cfName)
     {
         String keyCachePath = "org.apache.cassandra.db:type=Caches,keyspace=" + tableName + ",cache=" + cfName + "KeyCache";
         try
         {
-            return JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), JMXInstrumentedCacheMBean.class);
+            return JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), InstrumentingCacheMBean.class);
         }
         catch (MalformedObjectNameException e)
         {
@@ -278,12 +278,12 @@ public class NodeProbe
         }
     }
     
-    public JMXInstrumentedCacheMBean getRowCacheMBean(String tableName, String cfName)
+    public InstrumentingCacheMBean getRowCacheMBean(String tableName, String cfName)
     {
         String rowCachePath = "org.apache.cassandra.db:type=Caches,keyspace=" + tableName + ",cache=" + cfName + "RowCache";
         try
         {
-            return JMX.newMBeanProxy(mbeanServerConn, new ObjectName(rowCachePath), JMXInstrumentedCacheMBean.class);
+            return JMX.newMBeanProxy(mbeanServerConn, new ObjectName(rowCachePath), InstrumentingCacheMBean.class);
         }
         catch (MalformedObjectNameException e)
         {
@@ -426,12 +426,12 @@ public class NodeProbe
         try
         {
             String keyCachePath = "org.apache.cassandra.db:type=Caches,keyspace=" + tableName + ",cache=" + cfName + "KeyCache";
-            JMXInstrumentedCacheMBean keyCacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), JMXInstrumentedCacheMBean.class);
+            InstrumentingCacheMBean keyCacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), InstrumentingCacheMBean.class);
             keyCacheMBean.setCapacity(keyCacheCapacity);
 
             String rowCachePath = "org.apache.cassandra.db:type=Caches,keyspace=" + tableName + ",cache=" + cfName + "RowCache";
-            JMXInstrumentedCacheMBean rowCacheMBean = null;
-            rowCacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(rowCachePath), JMXInstrumentedCacheMBean.class);
+            InstrumentingCacheMBean rowCacheMBean = null;
+            rowCacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(rowCachePath), InstrumentingCacheMBean.class);
             rowCacheMBean.setCapacity(rowCacheCapacity);
         }
         catch (MalformedObjectNameException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Tue Apr  5 15:22:57 2011
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Apr  5 15:22:57 2011
@@ -39,6 +39,7 @@ import org.apache.commons.collections.it
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
@@ -628,4 +629,19 @@ public class FBUtilities
 
         return field;
     }
+
+    public static IRowCacheProvider newCacheProvider(String cache_provider)
+    {
+        if (!cache_provider.contains("."))
+            cache_provider = "org.apache.cassandra.cache." + cache_provider;
+        try
+        {
+            return FBUtilities.construct(cache_provider, "row cache provider");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
 }

Added: cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1089078&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java Tue Apr  5 15:22:57 2011
@@ -0,0 +1,88 @@
+package org.apache.cassandra.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+
+import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.*;
+
+public class CacheProviderTest extends SchemaLoader
+{
+    String key1 = "key1";
+    String key2 = "key2";
+    String key3 = "key3";
+    String key4 = "key4";
+    String key5 = "key5";
+    private static final int CAPACITY = 4;
+
+    private void simpleCase(ColumnFamily cf, ICache<String, ColumnFamily> cache)
+    {
+        cache.put(key1, cf);
+        assertDigests(cache.get(key1), cf);
+        cache.put(key2, cf);
+        cache.put(key3, cf);
+        cache.put(key4, cf);
+        cache.put(key5, cf);
+        
+        assertEquals(CAPACITY, cache.size());
+    }
+
+    private void assertDigests(ColumnFamily one, ColumnFamily two)
+    {
+        // CF does not implement .equals
+        assert ColumnFamily.digest(one).equals(ColumnFamily.digest(two));
+    }
+
+    // TODO this isn't terribly useful
+    private void concurrentCase(final ColumnFamily cf, final ICache<String, ColumnFamily> cache) throws InterruptedException
+    {
+        Runnable runable = new Runnable()
+        {
+            public void run()
+            {
+                for (int j = 0; j < 10; j++)
+                {
+                    cache.put(key1, cf);
+                    cache.put(key2, cf);
+                    cache.put(key3, cf);
+                    cache.put(key4, cf);
+                    cache.put(key5, cf);
+                }
+            }
+        };
+
+        List<Thread> threads = new ArrayList<Thread>(100);
+        for (int i = 0; i < 100; i++)
+        {
+            Thread thread = new Thread(runable);
+            threads.add(thread);
+            thread.start();
+        }
+        for (Thread thread : threads)
+            thread.join();
+    }
+
+    private ColumnFamily createCF()
+    {
+        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
+        cf.addColumn(column("vijay", "great", 1));
+        cf.addColumn(column("awesome", "vijay", 1));
+        return cf;
+    }
+    
+    @Test
+    public void testHeapCache() throws InterruptedException
+    {
+        ICache<String, ColumnFamily> cache = ConcurrentLinkedHashCache.create(CAPACITY);
+        ColumnFamily cf = createCF();
+        simpleCase(cf, cache);
+        concurrentCase(cf, cache);
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1089078&r1=1089077&r2=1089078&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Apr  5 15:22:57 2011
@@ -74,9 +74,9 @@ public class KeyCacheTest extends Cleanu
 
         // really? our caches don't implement the map interface? (hence no .addAll)
         Map<Pair<Descriptor, DecoratedKey>, Long> savedMap = new HashMap<Pair<Descriptor, DecoratedKey>, Long>();
-        for (Map.Entry<Pair<Descriptor, DecoratedKey>, Long> entry : store.getKeyCache().getEntrySet())
+        for (Pair<Descriptor, DecoratedKey> k : store.getKeyCache().getKeySet())
         {
-            savedMap.put(entry.getKey(), entry.getValue());
+            savedMap.put(k, store.getKeyCache().get(k));
         }
 
         // force the cache to disk



Mime
View raw message