cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1158927 [1/2] - in /cassandra/trunk: ./ conf/ lib/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/context/ src/java/org/apache/cassandra/db/index/keys/ src/java/org/apache/ca...
Date Wed, 17 Aug 2011 21:52:43 GMT
Author: jbellis
Date: Wed Aug 17 21:52:41 2011
New Revision: 1158927

URL: http://svn.apache.org/viewvc?rev=1158927&view=rev
Log:
Arena allocation for memtables
patch by jbellis and stuhood for CASSANDRA-2252

Added:
    cassandra/trunk/lib/jamm-0.2.4.jar
    cassandra/trunk/src/java/org/apache/cassandra/utils/Allocator.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/HeapAllocator.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/SlabAllocator.java
Removed:
    cassandra/trunk/lib/jamm-0.2.2.jar
Modified:
    cassandra/trunk/build.xml
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ISortedColumns.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java
    cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
    cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TimeSortTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Aug 17 21:52:41 2011
@@ -361,7 +361,7 @@ url=${svn.entry.url}?pathrev=${svn.entry
           </dependency>
           <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
           <dependency groupId="com.github.stephenc.high-scale-lib" artifactId="high-scale-lib" version="1.1.2"/>
-          <dependency groupId="com.github.stephenc" artifactId="jamm" version="0.2.2"/>
+          <dependency groupId="com.github.stephenc" artifactId="jamm" version="0.2.4"/>
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.6"/>
           <dependency groupId="org.apache.cassandra.deps" artifactId="avro" version="1.4.0-cassandra-1">
             <exclusion groupId="org.jboss.netty" artifactId="netty"/>
@@ -948,7 +948,7 @@ url=${svn.entry.url}?pathrev=${svn.entry
         <jvmarg value="-Dstorage-config=${test.conf}"/>
         <jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
         <jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
-        <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.2.jar" />
+        <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.4.jar" />
         <jvmarg value="-ea"/>
         <optjvmargs/>
         <classpath>

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Wed Aug 17 21:52:41 2011
@@ -100,7 +100,7 @@ JVM_OPTS="$JVM_OPTS -ea"
 check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'`
 if [ "$check_openjdk" != "OpenJDK" ]
 then
-    JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
+    JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.4.jar"
 fi
 
 # enable thread priorities, primarily so we can give periodic tasks

Added: cassandra/trunk/lib/jamm-0.2.4.jar
URL: http://svn.apache.org/viewvc/cassandra/trunk/lib/jamm-0.2.4.jar?rev=1158927&view=auto
==============================================================================
Files cassandra/trunk/lib/jamm-0.2.4.jar (added) and cassandra/trunk/lib/jamm-0.2.4.jar Wed Aug 17 21:52:41 2011 differ

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Wed Aug 17 21:52:41 2011
@@ -35,7 +35,9 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HeapAllocator;
 
 public abstract class AbstractColumnContainer implements IColumnContainer, IIterableColumns
 {
@@ -105,15 +107,20 @@ public abstract class AbstractColumnCont
     /**
      * We need to go through each column in the column container and resolve it before adding
      */
-    public void addAll(AbstractColumnContainer cc)
+    public void addAll(AbstractColumnContainer cc, Allocator allocator)
     {
-        columns.addAll(cc.columns);
+        columns.addAll(cc.columns, allocator);
         delete(cc);
     }
 
     public void addColumn(IColumn column)
     {
-        columns.addColumn(column);
+        addColumn(column, HeapAllocator.instance);
+    }
+
+    public void addColumn(IColumn column, Allocator allocator)
+    {
+        columns.addColumn(column, allocator);
     }
 
     public IColumn getColumn(ByteBuffer name)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java Wed Aug 17 21:52:41 2011
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Allocator;
 
 /**
  * A ISortedColumns backed by an ArrayList.
@@ -100,7 +101,7 @@ public class ArrayBackedSortedColumns ex
      * without knowing about (we can revisit that decision later if we have
      * use cases where most insert are in sorted order but a few are not).
      */
-    public void addColumn(IColumn column)
+    public void addColumn(IColumn column, Allocator allocator)
     {
         if (isEmpty())
         {
@@ -122,13 +123,13 @@ public class ArrayBackedSortedColumns ex
         else if (c == 0)
         {
             // Resolve against last
-            resolveAgainst(size() - 1, column);
+            resolveAgainst(size() - 1, column, allocator);
         }
         else
         {
             int pos = binarySearch(column.name());
             if (pos >= 0)
-                resolveAgainst(pos, column);
+                resolveAgainst(pos, column, allocator);
             else
                 add(-pos-1, column);
         }
@@ -138,19 +139,19 @@ public class ArrayBackedSortedColumns ex
      * Resolve against element at position i.
      * Assume that i is a valid position.
      */
-    private void resolveAgainst(int i, IColumn column)
+    private void resolveAgainst(int i, IColumn column, Allocator allocator)
     {
         IColumn oldColumn = get(i);
         if (oldColumn instanceof SuperColumn)
         {
             // Delegated to SuperColumn
             assert column instanceof SuperColumn;
-            ((SuperColumn) oldColumn).putColumn((SuperColumn)column);
+            ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
         }
         else
         {
             // calculate reconciled col from old (existing) col and new col
-            IColumn reconciledColumn = column.reconcile(oldColumn);
+            IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
             set(i, reconciledColumn);
         }
     }
@@ -186,7 +187,7 @@ public class ArrayBackedSortedColumns ex
         return -mid - (result < 0 ? 1 : 2);
     }
 
-    public void addAll(ISortedColumns cm)
+    public void addAll(ISortedColumns cm, Allocator allocator)
     {
         if (cm.isEmpty())
             return;
@@ -214,7 +215,7 @@ public class ArrayBackedSortedColumns ex
             else // c == 0
             {
                 add(copy[idx]);
-                resolveAgainst(size() - 1, otherColumn);
+                resolveAgainst(size() - 1, otherColumn, allocator);
                 idx++;
                 otherColumn = other.hasNext() ? other.next() : null;
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Aug 17 21:52:41 2011
@@ -30,8 +30,9 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
+import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * Column is immutable, which prevents all kinds of confusion in a multithreaded environment.
@@ -147,6 +148,11 @@ public class Column implements IColumn
 
     public void addColumn(IColumn column)
     {
+        addColumn(null, null);
+    }
+
+    public void addColumn(IColumn column, Allocator allocator)
+    {
         throw new UnsupportedOperationException("This operation is not supported for simple columns.");
     }
 
@@ -184,6 +190,11 @@ public class Column implements IColumn
 
     public IColumn reconcile(IColumn column)
     {
+        return reconcile(column, HeapAllocator.instance);
+    }
+
+    public IColumn reconcile(IColumn column, Allocator allocator)
+    {
         // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
         if (isMarkedForDelete())
             return timestamp() < column.timestamp() ? column : this;
@@ -225,9 +236,14 @@ public class Column implements IColumn
 
     public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new Column(cfs.internOrCopy(name), ByteBufferUtil.clone(value), timestamp);
+        return localCopy(cfs, HeapAllocator.instance);
     }
     
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new Column(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
+    }
+
     public String getString(AbstractType comparator)
     {
         StringBuilder sb = new StringBuilder();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Aug 17 21:52:41 2011
@@ -31,7 +31,9 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.IColumnSerializer;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HeapAllocator;
 
 public class ColumnFamily extends AbstractColumnContainer
 {
@@ -121,6 +123,10 @@ public class ColumnFamily extends Abstra
         return cfm;
     }
 
+    /**
+     * FIXME: shouldn't need to hold a reference to a serializer; worse, for super cfs,
+     * it will be a _unique_ serializer object per row
+     */
     public IColumnSerializer getColumnSerializer()
     {
         return cfm.getColumnSerializer();
@@ -302,10 +308,15 @@ public class ColumnFamily extends Abstra
 
     public void resolve(ColumnFamily cf)
     {
+        resolve(cf, HeapAllocator.instance);
+    }
+
+    public void resolve(ColumnFamily cf, Allocator allocator)
+    {
         // Row _does_ allow null CF objects :(  seems a necessary evil for efficiency
         if (cf == null)
             return;
-        addAll(cf);
+        addAll(cf, allocator);
     }
 
     public long serializedSize()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Aug 17 21:52:41 2011
@@ -110,10 +110,10 @@ public class ColumnFamilySerializer impl
 
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false, false, ThreadSafeSortedColumns.FACTORY);
+        return deserialize(dis, false, ThreadSafeSortedColumns.FACTORY);
     }
 
-    public ColumnFamily deserialize(DataInput dis, boolean intern, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+    public ColumnFamily deserialize(DataInput dis, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
     {
         if (!dis.readBoolean())
             return null;
@@ -124,23 +124,22 @@ public class ColumnFamilySerializer impl
             throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
         ColumnFamily cf = ColumnFamily.create(cfId, factory);
         deserializeFromSSTableNoColumns(cf, dis);
-        deserializeColumns(dis, cf, intern, fromRemote);
+        deserializeColumns(dis, cf, fromRemote);
         return cf;
     }
 
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean fromRemote) throws IOException
     {
-        int count = dis.readInt();
-        deserializeColumns(dis, cf, count, intern, fromRemote);
+        int size = dis.readInt();
+        deserializeColumns(dis, cf, size, fromRemote);
     }
 
     /* column count is already read from DataInput */
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, int count, boolean intern, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean fromRemote) throws IOException
     {
-        ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
-        for (int i = 0; i < count; ++i)
+        for (int i = 0; i < size; ++i)
         {
-            IColumn column = cf.getColumnSerializer().deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
+            IColumn column = cf.getColumnSerializer().deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
             cf.addColumn(column);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Aug 17 21:52:41 2011
@@ -749,7 +749,7 @@ public class ColumnFamilyStore implement
         {
             ColumnFamily cachedRow = getRawCachedRow(key);
             if (cachedRow != null)
-                cachedRow.addAll(columnFamily);
+                cachedRow.addAll(columnFamily, HeapAllocator.instance);
         }
     }
 
@@ -1211,7 +1211,7 @@ public class ColumnFamilyStore implement
                     {
                         ColumnFamily cf = cached.cloneMeShallow();
                         if (sc != null)
-                            cf.addColumn(sc);
+                            cf.addColumn(sc, HeapAllocator.instance);
                         return removeDeleted(cf, gcBefore);
                     }
                 }
@@ -1919,10 +1919,10 @@ public class ColumnFamilyStore implement
         return internedName;
     }
 
-    public ByteBuffer internOrCopy(ByteBuffer name)
+    public ByteBuffer internOrCopy(ByteBuffer name, Allocator allocator)
     {
         if (internedNames.size() >= INTERN_CUTOFF)
-            return ByteBufferUtil.clone(name);
+            return allocator.clone(name);
 
         return intern(name);
     }
@@ -1930,7 +1930,7 @@ public class ColumnFamilyStore implement
     public ByteBuffer maybeIntern(ByteBuffer name)
     {
         if (internedNames.size() >= INTERN_CUTOFF)
-            return name;
+            return null;
 
         return intern(name);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Wed Aug 17 21:52:41 2011
@@ -69,7 +69,7 @@ public class ColumnSerializer implements
 
     public Column deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, null, false);
+        return deserialize(dis, false);
     }
 
     /*
@@ -77,18 +77,16 @@ public class ColumnSerializer implements
      * deserialize comes from a remote host. If it does, then we must clear
      * the delta.
      */
-    public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
+    public Column deserialize(DataInput dis, boolean fromRemote) throws IOException
     {
-        return deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
+        return deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
     }
 
-    public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+    public Column deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
             throw new CorruptColumnException("invalid column name length " + name.remaining());
-        if (interner != null)
-            name = interner.maybeIntern(name);
 
         int b = dis.readUnsignedByte();
         if ((b & COUNTER_MASK) != 0)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Wed Aug 17 21:52:41 2011
@@ -31,7 +31,9 @@ import org.apache.cassandra.db.context.I
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.NodeId;
 
 /**
@@ -47,12 +49,12 @@ public class CounterColumn extends Colum
 
     public CounterColumn(ByteBuffer name, long value, long timestamp)
     {
-        this(name, contextManager.create(value), timestamp);
+        this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
     }
 
     public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
     {
-        this(name, contextManager.create(value), timestamp, timestampOfLastDelete);
+        this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
     }
 
     public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
@@ -135,7 +137,7 @@ public class CounterColumn extends Colum
     }
 
     @Override
-    public IColumn reconcile(IColumn column)
+    public IColumn reconcile(IColumn column, Allocator allocator)
     {
         assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
 
@@ -162,7 +164,7 @@ public class CounterColumn extends Colum
         // live + live: merge clocks; update value
         return new CounterColumn(
             name(),
-            contextManager.merge(value(), column.value()),
+            contextManager.merge(value(), column.value(), allocator),
             Math.max(timestamp(), column.timestamp()),
             Math.max(timestampOfLastDelete(), ((CounterColumn)column).timestampOfLastDelete()));
     }
@@ -185,7 +187,13 @@ public class CounterColumn extends Colum
     @Override
     public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterColumn(cfs.internOrCopy(name), ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
+        return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new CounterColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Wed Aug 17 21:52:41 2011
@@ -40,7 +40,9 @@ import org.apache.cassandra.io.ICompactS
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 
@@ -144,7 +146,7 @@ public class CounterMutation implements 
                 localMutation.add(merger);
                 localMutation.apply();
 
-                cf.addAll(merger);
+                cf.addAll(merger, HeapAllocator.instance);
             }
         }
         return row;
@@ -216,8 +218,7 @@ public class CounterMutation implements 
 
     public void apply() throws IOException
     {
-        // We need to transform all CounterUpdateColumn to CounterColumn and we need to deepCopy. Both are done 
-        // below since CUC.asCounterColumn() does a deep copy.
+        // transform all CounterUpdateColumn to CounterColumn: accomplished by localCopy
         RowMutation rm = new RowMutation(rowMutation.getTable(), ByteBufferUtil.clone(rowMutation.key()));
         Table table = Table.open(rm.getTable());
 
@@ -227,7 +228,7 @@ public class CounterMutation implements 
             ColumnFamilyStore cfs = table.getColumnFamilyStore(cf.id());
             for (IColumn column : cf_)
             {
-                cf.addColumn(column.localCopy(cfs));
+                cf.addColumn(column.localCopy(cfs), HeapAllocator.instance);
             }
             rm.add(cf);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java Wed Aug 17 21:52:41 2011
@@ -21,7 +21,9 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * A counter update while it hasn't been applied yet by the leader replica.
@@ -55,7 +57,7 @@ public class CounterUpdateColumn extends
     }
 
     @Override
-    public IColumn reconcile(IColumn column)
+    public IColumn reconcile(IColumn column, Allocator allocator)
     {
         // The only time this could happen is if a batchAdd ships two
         // increment for the same column. Hence we simply sums the delta.
@@ -80,8 +82,17 @@ public class CounterUpdateColumn extends
     @Override
     public CounterColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterColumn(cfs.internOrCopy(name),
-                                 CounterContext.instance().create(delta()),
+        return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance),
+                                 CounterContext.instance().create(delta(), HeapAllocator.instance),
+                                 timestamp(),
+                                 Long.MIN_VALUE);
+    }
+
+    @Override
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new CounterColumn(cfs.internOrCopy(name, allocator),
+                                 CounterContext.instance().create(delta(), allocator),
                                  timestamp(),
                                  Long.MIN_VALUE);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java Wed Aug 17 21:52:41 2011
@@ -22,7 +22,9 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 
 public class DeletedColumn extends Column
 {    
@@ -55,17 +57,23 @@ public class DeletedColumn extends Colum
     }
 
     @Override
-    public IColumn reconcile(IColumn column)
+    public IColumn reconcile(IColumn column, Allocator allocator)
     {
         if (column instanceof DeletedColumn)
-            return super.reconcile(column);
-        return column.reconcile(this);
+            return super.reconcile(column, allocator);
+        return column.reconcile(this, allocator);
     }
     
     @Override
     public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new DeletedColumn(cfs.internOrCopy(name), ByteBufferUtil.clone(value), timestamp);
+        return new DeletedColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
+    }
+
+    @Override
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new DeletedColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Wed Aug 17 21:52:41 2011
@@ -26,7 +26,9 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * Alternative to Column that have an expiring time.
@@ -117,7 +119,16 @@ public class ExpiringColumn extends Colu
     @Override
     public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new ExpiringColumn(cfs.internOrCopy(name), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+        return new ExpiringColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        ByteBuffer clonedName = cfs.maybeIntern(name);
+        if (clonedName == null)
+            clonedName = allocator.clone(name);
+        return new ExpiringColumn(clonedName, allocator.clone(value), timestamp, timeToLive, localExpirationTime);
     }
     
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Wed Aug 17 21:52:41 2011
@@ -25,8 +25,10 @@ import java.util.Collection;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.FBUtilities;
 
+/** TODO: rename */
 public interface IColumn
 {
     public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
@@ -43,18 +45,26 @@ public interface IColumn
     public Collection<IColumn> getSubColumns();
     public IColumn getSubColumn(ByteBuffer columnName);
     public void addColumn(IColumn column);
+    public void addColumn(IColumn column, Allocator allocator);
     public IColumn diff(IColumn column);
     public IColumn reconcile(IColumn column);
+    public IColumn reconcile(IColumn column, Allocator allocator);
     public void updateDigest(MessageDigest digest);
     public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
     public String getString(AbstractType comparator);
     public void validateFields(CFMetaData metadata) throws MarshalException;
 
-    /** clones the column, interning column names and making copies of other underlying byte buffers
-     * @param cfs*/
+    /** clones the column for the row cache, interning column names and making copies of other underlying byte buffers */
     IColumn localCopy(ColumnFamilyStore cfs);
 
     /**
+     * clones the column for the memtable, interning column names and making copies of other underlying byte buffers.
+     * Unlike the other localCopy, this uses Allocator to allocate values in contiguous memory regions,
+     * which helps avoid heap fragmentation.
+     */
+    IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator);
+
+    /**
      * For a simple column, live == !isMarkedForDelete.
      * For a supercolumn, live means it has at least one subcolumn whose timestamp is greater than the
      * supercolumn deleted-at time.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Wed Aug 17 21:52:41 2011
@@ -24,10 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Allocator;
 
 public interface IColumnContainer
 {
     public void addColumn(IColumn column);
+    public void addColumn(IColumn column, Allocator allocator);
     public void remove(ByteBuffer columnName);
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ISortedColumns.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ISortedColumns.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ISortedColumns.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ISortedColumns.java Wed Aug 17 21:52:41 2011
@@ -25,6 +25,7 @@ import java.util.SortedSet;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.Allocator;
 
 /**
  * A sorted map of columns.
@@ -45,7 +46,7 @@ public interface ISortedColumns extends 
      * If a column with the same name is already present in the map, it will
      * be replaced by the newly added column.
      */
-    public void addColumn(IColumn column);
+    public void addColumn(IColumn column, Allocator allocator);
 
     /**
      * Adds all the columns of a given column map to this column map.
@@ -56,7 +57,7 @@ public interface ISortedColumns extends 
      *   </code>
      *  but is potentially faster.
      */
-    public void addAll(ISortedColumns cm);
+    public void addAll(ISortedColumns cm, Allocator allocator);
 
     /**
      * Replace oldColumn if present by newColumn.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Aug 17 21:52:41 2011
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.filter.Sl
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.utils.SlabAllocator;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.github.jamm.MemoryMeter;
 
@@ -54,7 +55,7 @@ public class Memtable
     private static final double MIN_SANE_LIVE_RATIO = 1.0;
     // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
     private static final double MAX_SANE_LIVE_RATIO = 64.0;
-    private static final MemoryMeter meter = new MemoryMeter();
+    private static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead();
 
     // we're careful to only allow one count to run at a time because counting is slow
     // (can be minutes, for a large memtable and a busy server), so we could keep memtables
@@ -69,6 +70,8 @@ public class Memtable
         }
     };
 
+    volatile static Memtable activelyMeasuring;
+
     private volatile boolean isFrozen;
     private final AtomicLong currentThroughput = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -78,7 +81,7 @@ public class Memtable
 
     private final long THRESHOLD;
     private final long THRESHOLD_COUNT;
-    volatile static Memtable activelyMeasuring;
+    private SlabAllocator allocator = new SlabAllocator();
 
     public Memtable(ColumnFamilyStore cfs)
     {
@@ -89,8 +92,9 @@ public class Memtable
 
     public long getLiveSize()
     {
-        // 25% fudge factor
-        return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+        // 25% fudge factor on the base throughput * liveRatio calculation.  (Based on observed
+        // pre-slabbing behavior -- not sure what accounts for this. May have changed with introduction of slabbing.)
+        return (long) (currentThroughput.get() * cfs.liveRatio * 1.25) + allocator.size();
     }
 
     public long getSerializedSize()
@@ -192,11 +196,25 @@ public class Memtable
                                     ? cf.isMarkedForDelete() ? 1 : 0
                                     : cf.getColumnCount());
 
-        ColumnFamily oldCf = columnFamilies.putIfAbsent(key, cf);
-        if (oldCf == null)
-            return;
+        ColumnFamily clonedCf = columnFamilies.get(key);
+        // if the row doesn't exist yet in the memtable, clone cf to our allocator.
+        if (clonedCf == null)
+        {
+            clonedCf = cf.cloneMeShallow();
+            for (IColumn column : cf.getSortedColumns())
+                clonedCf.addColumn(column.localCopy(cfs, allocator));
+            clonedCf = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), clonedCf);
+            if (clonedCf == null)
+                return;
+            // else there was a race and the other thread won.  fall through to updating his CF object
+        }
 
-        oldCf.resolve(cf);
+        // we duplicate the funcationality of CF.resolve here to avoid having to either pass the Memtable in for
+        // the cloning operation, or cloning the CF container as well as the Columns.  fortunately, resolve
+        // is really quite simple:
+        clonedCf.delete(cf);
+        for (IColumn column : cf.getSortedColumns())
+            clonedCf.addColumn(column.localCopy(cfs, allocator), allocator);
     }
 
     // for debugging

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Aug 17 21:52:41 2011
@@ -66,7 +66,7 @@ public class Row
         public Row deserialize(DataInputStream dis, int version, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
         {
             return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
-                           ColumnFamily.serializer().deserialize(dis, false, fromRemote, factory));
+                           ColumnFamily.serializer().deserialize(dis, fromRemote, factory));
         }
 
         public Row deserialize(DataInputStream dis, int version) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Aug 17 21:52:41 2011
@@ -373,23 +373,6 @@ public class RowMutation implements IMut
         return rm;
     }
 
-    public RowMutation localCopy()
-    {
-        RowMutation rm = new RowMutation(table_, ByteBufferUtil.clone(key_));
-
-        Table table = Table.open(table_);
-        for (Map.Entry<Integer, ColumnFamily> entry : modifications_.entrySet())
-        {
-            ColumnFamily cf = entry.getValue().cloneMeShallow();
-            ColumnFamilyStore cfs = table.getColumnFamilyStore(cf.id());
-            for (IColumn col : entry.getValue())
-                cf.addColumn(col.localCopy(cfs));
-            rm.modifications_.put(entry.getKey(), cf);
-        }
-
-        return rm;
-    }
-
     public static class RowMutationSerializer implements ICompactSerializer<RowMutation>
     {
         public void serialize(RowMutation rm, DataOutputStream dos, int version) throws IOException
@@ -419,7 +402,7 @@ public class RowMutation implements IMut
             for (int i = 0; i < size; ++i)
             {
                 Integer cfid = Integer.valueOf(dis.readInt());
-                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, true, fromRemote, ThreadSafeSortedColumns.FACTORY);
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, fromRemote, ThreadSafeSortedColumns.FACTORY);
                 modifications.put(cfid, cf);
             }
             return new RowMutation(table, key, modifications);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed Aug 17 21:52:41 2011
@@ -25,9 +25,6 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -35,10 +32,11 @@ import org.apache.cassandra.db.marshal.M
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.ColumnSortedMap;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-
 public class SuperColumn extends AbstractColumnContainer implements IColumn
 {
     private static NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
@@ -160,21 +158,21 @@ public class SuperColumn extends Abstrac
     }
 
     @Override
-    public void addColumn(IColumn column)
+    public void addColumn(IColumn column, Allocator allocator)
     {
         assert column instanceof Column : "A super column can only contain simple columns";
-        super.addColumn((Column)column);
+        super.addColumn(column, allocator);
     }
 
     /*
      * Go through each sub column if it exists then as it to resolve itself
      * if the column does not exist then create it.
      */
-    void putColumn(SuperColumn column)
+    void putColumn(SuperColumn column, Allocator allocator)
     {
         for (IColumn subColumn : column.getSubColumns())
         {
-        	addColumn(subColumn);
+        	addColumn(subColumn, allocator);
         }
         delete(column);
     }
@@ -255,23 +253,22 @@ public class SuperColumn extends Abstrac
         return mostRecentLiveChangeAt() > getMarkedForDeleteAt();
     }
 
-    public IColumn shallowCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name()), this.getComparator());
-        // since deletion info is immutable, aliasing it is fine
-        sc.deletionInfo.set(deletionInfo.get());
-        return sc;
+        return localCopy(cfs, HeapAllocator.instance);
     }
-    
-    public IColumn localCopy(ColumnFamilyStore cfs)
+
+    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         // we don't try to intern supercolumn names, because if we're using Cassandra correctly it's almost
         // certainly just going to pollute our interning map with unique, dynamic values
-        SuperColumn sc = (SuperColumn)shallowCopy();
+        SuperColumn sc = new SuperColumn(allocator.clone(name), this.getComparator());
+        // since deletion info is immutable, aliasing it is fine
+        sc.deletionInfo.set(deletionInfo.get());
 
         for(IColumn c : columns)
         {
-            sc.addColumn(c.localCopy(cfs));
+            sc.addColumn(c.localCopy(cfs, allocator));
         }
 
         return sc;
@@ -279,6 +276,11 @@ public class SuperColumn extends Abstrac
 
     public IColumn reconcile(IColumn c)
     {
+        return reconcile(null, null);
+    }
+
+    public IColumn reconcile(IColumn c, Allocator allocator)
+    {
         throw new UnsupportedOperationException("This operation is unsupported on super columns.");
     }
 
@@ -335,20 +337,15 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, null, false);
-    }
-
-    public IColumn deserialize(DataInput dis, ColumnFamilyStore interner) throws IOException
-    {
-        return deserialize(dis, interner, false);
+        return deserialize(dis, false);
     }
 
-    public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
+    public IColumn deserialize(DataInput dis, boolean fromRemote) throws IOException
     {
-        return deserialize(dis, interner, fromRemote, (int)(System.currentTimeMillis() / 1000));
+        return deserialize(dis, fromRemote, (int)(System.currentTimeMillis() / 1000));
     }
 
-    public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+    public IColumn deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
@@ -361,7 +358,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, interner, size, fromRemote, expireBefore);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, fromRemote, expireBefore);
         SuperColumn superColumn = new SuperColumn(name, ThreadSafeSortedColumns.factory().fromSorted(preSortedMap, false));
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java Wed Aug 17 21:52:41 2011
@@ -18,15 +18,14 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.Comparator;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Allocator;
 
 public class ThreadSafeSortedColumns extends ConcurrentSkipListMap<ByteBuffer, IColumn> implements ISortedColumns
 {
@@ -72,7 +71,7 @@ public class ThreadSafeSortedColumns ext
      * If we find an old column that has the same name
      * the ask it to resolve itself else add the new column
     */
-    public void addColumn(IColumn column)
+    public void addColumn(IColumn column, Allocator allocator)
     {
         ByteBuffer name = column.name();
         IColumn oldColumn;
@@ -81,13 +80,13 @@ public class ThreadSafeSortedColumns ext
             if (oldColumn instanceof SuperColumn)
             {
                 assert column instanceof SuperColumn;
-                ((SuperColumn) oldColumn).putColumn((SuperColumn)column);
+                ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
                 break;  // Delegated to SuperColumn
             }
             else
             {
                 // calculate reconciled col from old (existing) col and new col
-                IColumn reconciledColumn = column.reconcile(oldColumn);
+                IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
                 if (replace(name, oldColumn, reconciledColumn))
                     break;
 
@@ -100,10 +99,10 @@ public class ThreadSafeSortedColumns ext
     /**
      * We need to go through each column in the column container and resolve it before adding
      */
-    public void addAll(ISortedColumns cm)
+    public void addAll(ISortedColumns cm, Allocator allocator)
     {
         for (IColumn column : cm.getSortedColumns())
-            addColumn(column);
+            addColumn(column, allocator);
     }
 
     public boolean replace(IColumn oldColumn, IColumn newColumn)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Wed Aug 17 21:52:41 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.CounterCo
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * PrecompactedRow merges its rows in its constructor in memory.
@@ -96,7 +97,7 @@ public class PrecompactedRow extends Abs
             }
             else
             {
-                cf.addAll(thisCF);
+                cf.addAll(thisCF, HeapAllocator.instance);
             }
         }
         return cf;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java Wed Aug 17 21:52:41 2011
@@ -25,6 +25,8 @@ import org.apache.log4j.Logger;
 
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.NodeId;
 
@@ -85,19 +87,21 @@ public class CounterContext implements I
     }
 
     /**
-     * Creates an initial counter context with an initial value for the local node with.
+     * Creates an initial counter context with an initial value for the local node.
+     *
      *
      * @param value the value for this initial update
      *
+     * @param allocator
      * @return an empty counter context.
      */
-    public ByteBuffer create(long value)
+    public ByteBuffer create(long value, Allocator allocator)
     {
-        ByteBuffer context = ByteBuffer.allocate(HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH + STEP_LENGTH);
+        ByteBuffer context = allocator.allocate(HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH + STEP_LENGTH);
         // The first (and only) elt is a delta
-        context.putShort(0, (short)1);
-        context.putShort(HEADER_SIZE_LENGTH, (short)0);
-        writeElementAtOffset(context, HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH, NodeId.getLocalId(), 1L, value);
+        context.putShort(context.position(), (short)1);
+        context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
+        writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH, NodeId.getLocalId(), 1L, value);
         return context;
     }
 
@@ -105,21 +109,23 @@ public class CounterContext implements I
     public ByteBuffer create(NodeId id, long clock, long value, boolean isDelta)
     {
         ByteBuffer context = ByteBuffer.allocate(HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0) + STEP_LENGTH);
-        context.putShort(0, (short)(isDelta ? 1 : 0));
+        context.putShort(context.position(), (short)(isDelta ? 1 : 0));
         if (isDelta)
         {
-            context.putShort(HEADER_SIZE_LENGTH, (short)0);
+            context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
         }
-        writeElementAtOffset(context, HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0), id, clock, value);
+        writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0), id, clock, value);
         return context;
     }
 
-    // write a tuple (node id, clock, count) at offset
+    // write a tuple (node id, clock, count) at an absolute (bytebuffer-wise) offset
     private static void writeElementAtOffset(ByteBuffer context, int offset, NodeId id, long clock, long count)
     {
-        ByteBufferUtil.arrayCopy(id.bytes(), id.bytes().position(), context, offset, NodeId.LENGTH);
-        context.putLong(offset + NodeId.LENGTH, clock);
-        context.putLong(offset + NodeId.LENGTH + CLOCK_LENGTH, count);
+        context = context.duplicate();
+        context.position(offset);
+        context.put(id.bytes().duplicate());
+        context.putLong(clock);
+        context.putLong(count);
     }
 
     private static int headerLength(ByteBuffer context)
@@ -276,8 +282,9 @@ public class CounterContext implements I
      *
      * @param left counter context.
      * @param right counter context.
+     * @param allocator An allocator for the merged value.
      */
-    public ByteBuffer merge(ByteBuffer left, ByteBuffer right)
+    public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator)
     {
         ContextState leftState = new ContextState(left, headerLength(left));
         ContextState rightState = new ContextState(right, headerLength(right));
@@ -316,7 +323,7 @@ public class CounterContext implements I
         mergedBodyLength += leftState.remainingBodyLength() + rightState.remainingBodyLength();
 
         // Do the actual merge
-        ByteBuffer merged = ByteBuffer.allocate(mergedHeaderLength + mergedBodyLength);
+        ByteBuffer merged = allocator.allocate(mergedHeaderLength + mergedBodyLength);
         merged.putShort(merged.position(), (short) ((mergedHeaderLength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
         ContextState mergedState = new ContextState(merged, mergedHeaderLength);
         leftState.reset();
@@ -550,7 +557,7 @@ public class CounterContext implements I
                     // Found someone to merge it to
                     int nbDelta = foundState.isDelta() ? 1 : 0;
                     nbDelta += state.isDelta() ? 1 : 0;
-                    ContextState merger = ContextState.allocate(2, nbDelta);
+                    ContextState merger = ContextState.allocate(2, nbDelta, HeapAllocator.instance);
 
                     long fclock = foundState.getClock();
                     long fcount = foundState.getCount();
@@ -644,7 +651,7 @@ public class CounterContext implements I
             }
             state.moveToNext();
         }
-        return toAddBack == 0 ? cleanedContext : merge(cleanedContext, create(toAddBack));
+        return toAddBack == 0 ? cleanedContext : merge(cleanedContext, create(toAddBack, HeapAllocator.instance), HeapAllocator.instance);
     }
 
     /**
@@ -793,10 +800,15 @@ public class CounterContext implements I
          */
         public static ContextState allocate(int elementCount, int deltaCount)
         {
+            return allocate(elementCount, deltaCount, HeapAllocator.instance);
+        }
+
+        public static ContextState allocate(int elementCount, int deltaCount, Allocator allocator)
+        {
             assert deltaCount <= elementCount;
             int hlength = HEADER_SIZE_LENGTH + deltaCount * HEADER_ELT_LENGTH;
-            ByteBuffer context = ByteBuffer.allocate(hlength + elementCount * STEP_LENGTH);
-            context.putShort(0, (short)deltaCount);
+            ByteBuffer context = allocator.allocate(hlength + elementCount * STEP_LENGTH);
+            context.putShort(context.position(), (short)deltaCount);
             return new ContextState(context, hlength);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java Wed Aug 17 21:52:41 2011
@@ -19,6 +19,8 @@ package org.apache.cassandra.db.context;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.utils.Allocator;
+
 /**
  * An opaque commutative context.
  *
@@ -53,12 +55,16 @@ public interface IContext
     /**
      * Return a context w/ an aggregated count for each node id.
      *
+     * @param allocator
+     *            An allocator for the merged value.
      * @param left
      *            context.
      * @param right
      *            context.
+     * @param allocator
+     *            an allocator to allocate the new context from.
      */
-    public ByteBuffer merge(ByteBuffer left, ByteBuffer right);
+    public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator);
 
     /**
      * Human-readable String from context.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java Wed Aug 17 21:52:41 2011
@@ -44,6 +44,8 @@ import org.apache.cassandra.thrift.Index
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -248,7 +250,7 @@ public class KeysSearcher extends Second
                         assert !extraFilter.columns.isEmpty();
                         ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
                         if (cf != null)
-                            data.addAll(cf);
+                            data.addAll(cf, HeapAllocator.instance);
                     }
 
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java Wed Aug 17 21:52:41 2011
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 
 public class CounterColumnType extends AbstractCommutativeType
 {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java Wed Aug 17 21:52:41 2011
@@ -23,10 +23,9 @@ package org.apache.cassandra.io;
 import java.io.DataInput;
 import java.io.IOException;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.IColumn;
 
 public interface IColumnSerializer extends ICompactSerializer2<IColumn>
 {
-    public IColumn deserialize(DataInput in, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException;
+    public IColumn deserialize(DataInput in, boolean fromRemote, int expireBefore) throws IOException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Aug 17 21:52:41 2011
@@ -175,7 +175,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, fromRemote, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -230,7 +230,7 @@ public class SSTableIdentityIterator imp
         assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow();
         // since we already read column count, just pass that value and continue deserialization
-        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
+        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
         if (validateColumns)
         {
             try

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java Wed Aug 17 21:52:41 2011
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * A SSTable writer that doesn't assume rows are in sorted order.
@@ -74,7 +75,7 @@ public class SSTableSimpleUnsortedWriter
         // Note that if the row was existing already, our size estimation will be slightly off
         // since we'll be counting the key multiple times.
         if (previous != null)
-            columnFamily.addAll(previous);
+            columnFamily.addAll(previous, HeapAllocator.instance);
 
         if (currentSize > bufferSize)
             sync();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Wed Aug 17 21:52:41 2011
@@ -43,15 +43,13 @@ public class ColumnSortedMap implements 
     private final DataInput dis;
     private final Comparator<ByteBuffer> comparator;
     private final int length;
-    private final ColumnFamilyStore interner;
     private final boolean fromRemote;
     private final int expireBefore;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
     {
         this.comparator = comparator;
         this.serializer = serializer;
-        this.interner = interner;
         this.dis = dis;
         this.length = length;
         this.fromRemote = fromRemote;
@@ -145,7 +143,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, interner, length, fromRemote, expireBefore);
+        return new ColumnSet(serializer, dis, length, fromRemote, expireBefore);
     }
 }
 
@@ -154,15 +152,13 @@ class ColumnSet implements Set<Map.Entry
     private final ColumnSerializer serializer;
     private final DataInput dis;
     private final int length;
-    private final ColumnFamilyStore interner;
     private boolean fromRemote;
     private final int expireBefore;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
     {
         this.serializer = serializer;
         this.dis = dis;
-        this.interner = interner;
         this.length = length;
         this.fromRemote = fromRemote;
         this.expireBefore = expireBefore;
@@ -185,7 +181,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, interner, length, fromRemote, expireBefore);
+        return new ColumnIterator(serializer, dis, length, fromRemote, expireBefore);
     }
 
     public Object[] toArray()
@@ -240,14 +236,12 @@ class ColumnIterator implements Iterator
     private final int length;
     private final boolean fromRemote;
     private int count = 0;
-    private ColumnFamilyStore interner;
     private final int expireBefore;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
     {
         this.dis = dis;
         this.serializer = serializer;
-        this.interner = interner;
         this.length = length;
         this.fromRemote = fromRemote;
         this.expireBefore = expireBefore;
@@ -258,7 +252,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis, interner, fromRemote, expireBefore);
+            return serializer.deserialize(dis, fromRemote, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Aug 17 21:52:41 2011
@@ -359,7 +359,7 @@ public class StorageProxy implements Sto
         {
             public void runMayThrow() throws IOException
             {
-                rm.localCopy().apply();
+                rm.apply();
                 responseHandler.response(null);
             }
         };

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Aug 17 21:52:41 2011
@@ -148,7 +148,7 @@ public class IncomingStreamReader
                         // restore ColumnFamily
                         cf = ColumnFamily.create(cfs.metadata);
                         ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
-                        ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+                        ColumnFamily.serializer().deserializeColumns(in, cf, true);
 
                         // write key and cf
                         writer.append(key, cf);

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/Allocator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/Allocator.java?rev=1158927&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/Allocator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/Allocator.java Wed Aug 17 21:52:41 2011
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+
+public abstract class Allocator
+{
+    /**
+     * Allocate a slice of the given length.
+     */
+    public ByteBuffer clone(ByteBuffer buffer)
+    {
+        assert buffer != null;
+        ByteBuffer cloned = allocate(buffer.remaining());
+
+        cloned.mark();
+        cloned.put(buffer.duplicate());
+        cloned.reset();
+        return cloned;
+    }
+
+    public abstract ByteBuffer allocate(int size);
+} 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1158927&r1=1158926&r2=1158927&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Wed Aug 17 21:52:41 2011
@@ -169,11 +169,11 @@ public class ByteBufferUtil
 
         if (buffer.hasArray())
         {
-            int start = buffer.position();
-            if (buffer.arrayOffset() == 0 && start == 0 && length == buffer.array().length)
+            int boff = buffer.arrayOffset() + buffer.position();
+            if (boff == 0 && length == buffer.array().length)
                 return buffer.array();
             else
-                return Arrays.copyOfRange(buffer.array(), start + buffer.arrayOffset(), start + length + buffer.arrayOffset());
+                return Arrays.copyOfRange(buffer.array(), boff, boff + length);
         }
         // else, DirectByteBuffer.get() is the fastest route
         byte[] bytes = new byte[length];
@@ -297,9 +297,8 @@ public class ByteBufferUtil
                 throw new IndexOutOfBoundsException();
 
             for (int i = 0; i < length; i++)
-            {
+                // TODO: ByteBuffer.put is polymorphic, and might be slow here
                 dst.put(dstPos++, src.get(srcPos++));
-            }
         }
     }
 

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/HeapAllocator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/HeapAllocator.java?rev=1158927&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/HeapAllocator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/HeapAllocator.java Wed Aug 17 21:52:41 2011
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+
+public final class HeapAllocator extends Allocator
+{
+    public static final HeapAllocator instance = new HeapAllocator();
+
+    private HeapAllocator() {}
+
+    public ByteBuffer allocate(int size)
+    {
+        return ByteBuffer.allocate(size);
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/SlabAllocator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/SlabAllocator.java?rev=1158927&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/SlabAllocator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/SlabAllocator.java Wed Aug 17 21:52:41 2011
@@ -0,0 +1,232 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+/**
+ * The SlabAllocator is a bump-the-pointer allocator that allocates
+ * large (2MB by default) regions and then doles them out to threads that request
+ * slices into the array.
+ * <p/>
+ * The purpose of this class is to combat heap fragmentation in long lived
+ * objects: by ensuring that all allocations with similar lifetimes
+ * only to large regions of contiguous memory, we ensure that large blocks
+ * get freed up at the same time.
+ * <p/>
+ * Otherwise, variable length byte arrays allocated end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ */
+public class SlabAllocator extends Allocator
+{
+    private final static int REGION_SIZE = 2 * 1024 * 1024;
+    private final static int MAX_CLONED_SIZE = 256 * 1024; // bigger than this don't go in the region
+
+    private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+    private final Collection<Region> filledRegions = new LinkedBlockingQueue<Region>();
+
+    /** @return Total number of bytes allocated by this allocator. */
+    public long size()
+    {
+        Iterable<Region> regions = filledRegions;
+        if (currentRegion.get() != null)
+            regions = Iterables.concat(regions, Collections.<Region>singleton(currentRegion.get()));
+
+        long total = 0;
+        for (Region region : regions)
+            total += region.size;
+        return total;
+    }
+
+    public ByteBuffer allocate(int size)
+    {
+        assert size >= 0;
+        if (size == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        // satisfy large allocations directly from JVM since they don't cause fragmentation
+        // as badly, and fill up our regions quickly
+        if (size > MAX_CLONED_SIZE)
+            return ByteBuffer.allocate(size);
+
+        while (true)
+        {
+            Region region = getRegion();
+
+            // Try to allocate from this region
+            ByteBuffer cloned = region.allocate(size);
+            if (cloned != null)
+                return cloned;
+
+            // not enough space!
+            tryRetireRegion(region);
+        }
+    }
+    
+    /**
+     * Try to retire the current region if it is still <code>region</code>.
+     * Postcondition is that curRegion.get() != region
+     */
+    private void tryRetireRegion(Region region)
+    {
+        if (currentRegion.compareAndSet(region, null))
+        {
+            filledRegions.add(region);
+        }
+    }
+
+    /**
+     * Get the current region, or, if there is no current region, allocate a new one
+     */
+    private Region getRegion()
+    {
+        while (true)
+        {
+            // Try to get the region
+            Region region = currentRegion.get();
+            if (region != null)
+                return region;
+
+            // No current region, so we want to allocate one. We race
+            // against other allocators to CAS in an uninitialized region
+            // (which is cheap to allocate)
+            region = new Region(REGION_SIZE);
+            if (currentRegion.compareAndSet(null, region))
+            {
+                // we won race - now we need to actually do the expensive allocation step
+                region.init();
+                return region;
+            }
+            // someone else won race - that's fine, we'll try to grab theirs
+            // in the next iteration of the loop.
+        }
+    }
+
+    /**
+     * A region of memory out of which allocations are sliced.
+     *
+     * This serves two purposes:
+     *  - to provide a step between initialization and allocation, so that racing to CAS a
+     *    new region in is harmless
+     *  - encapsulates the allocation offset
+     */
+    private static class Region
+    {
+        /**
+         * Actual underlying data
+         */
+        private ByteBuffer data;
+
+        private static final int UNINITIALIZED = -1;
+        /**
+         * Offset for the next allocation, or the sentinel value -1
+         * which implies that the region is still uninitialized.
+         */
+        private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+        /**
+         * Total number of allocations satisfied from this buffer
+         */
+        private AtomicInteger allocCount = new AtomicInteger();
+
+        /**
+         * Size of region in bytes
+         */
+        private final int size;
+
+        /**
+         * Create an uninitialized region. Note that memory is not allocated yet, so
+         * this is cheap.
+         *
+         * @param size in bytes
+         */
+        private Region(int size)
+        {
+            this.size = size;
+        }
+
+        /**
+         * Actually claim the memory for this region. This should only be called from
+         * the thread that constructed the region. It is thread-safe against other
+         * threads calling alloc(), who will block until the allocation is complete.
+         */
+        public void init()
+        {
+            assert nextFreeOffset.get() == UNINITIALIZED;
+            data = ByteBuffer.allocate(size);
+            assert data.remaining() == data.capacity();
+            // Mark that it's ready for use
+            boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+            // We should always succeed the above CAS since only one thread calls init()!
+            Preconditions.checkState(initted, "Multiple threads tried to init same region");
+        }
+
+        /**
+         * Try to allocate <code>size</code> bytes from the region.
+         *
+         * @return the successful allocation, or null to indicate not-enough-space
+         */
+        public ByteBuffer allocate(int size)
+        {
+            while (true)
+            {
+                int oldOffset = nextFreeOffset.get();
+                if (oldOffset == UNINITIALIZED)
+                {
+                    // The region doesn't have its data allocated yet.
+                    // Since we found this in currentRegion, we know that whoever
+                    // CAS-ed it there is allocating it right now. So spin-loop
+                    // shouldn't spin long!
+                    Thread.yield();
+                    continue;
+                }
+
+                if (oldOffset + size > data.capacity()) // capacity == remaining
+                    return null;
+
+                // Try to atomically claim this region
+                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+                {
+                    // we got the alloc
+                    allocCount.incrementAndGet();
+                    return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
+                }
+                // we raced and lost alloc, try again
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Region@" + System.identityHashCode(this) +
+                   " allocs=" + allocCount.get() + "waste=" +
+                   (data.capacity() - nextFreeOffset.get());
+        }
+    }
+}



Mime
View raw message