cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [1/2] Add multiple memory allocation options for memtables patch by benedict and xedin; reviewed by marcuse and xedin for CASSANDRA-6689
Date Tue, 25 Mar 2014 21:43:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 48847b5c0 -> 1a3b5dbcb


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
deleted file mode 100644
index 4396caf..0000000
--- a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The SlabAllocator is a bump-the-pointer allocator that allocates
- * large (2MB by default) regions and then doles them out to threads that request
- * slices into the array.
- * <p/>
- * The purpose of this class is to combat heap fragmentation in long lived
- * objects: by ensuring that all allocations with similar lifetimes
- * only to large regions of contiguous memory, we ensure that large blocks
- * get freed up at the same time.
- * <p/>
- * Otherwise, variable length byte arrays allocated end up
- * interleaved throughout the heap, and the old generation gets progressively
- * more fragmented until a stop-the-world compacting collection occurs.
- */
-public class HeapSlabAllocator extends PoolAllocator
-{
-    private static final Logger logger = LoggerFactory.getLogger(HeapSlabAllocator.class);
-
-    private final static int REGION_SIZE = 1024 * 1024;
-    private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
-
-    // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
-    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
-
-    private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
-    private final AtomicInteger regionCount = new AtomicInteger(0);
-    private AtomicLong unslabbed = new AtomicLong(0);
-
-    HeapSlabAllocator(Pool pool)
-    {
-        super(pool);
-    }
-
-    public ByteBuffer allocate(int size)
-    {
-        return allocate(size, null);
-    }
-
-    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
-    {
-        assert size >= 0;
-        if (size == 0)
-            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        markAllocated(size, opGroup);
-        // satisfy large allocations directly from JVM since they don't cause fragmentation
-        // as badly, and fill up our regions quickly
-        if (size > MAX_CLONED_SIZE)
-        {
-            unslabbed.addAndGet(size);
-            return ByteBuffer.allocate(size);
-        }
-
-        while (true)
-        {
-            Region region = getRegion();
-
-            // Try to allocate from this region
-            ByteBuffer cloned = region.allocate(size);
-            if (cloned != null)
-                return cloned;
-
-            // not enough space!
-            currentRegion.compareAndSet(region, null);
-        }
-    }
-
-    public void free(ByteBuffer name)
-    {
-        // have to assume we cannot free the memory here, and just reclaim it all when we flush
-    }
-
-    /**
-     * Get the current region, or, if there is no current region, allocate a new one
-     */
-    private Region getRegion()
-    {
-        while (true)
-        {
-            // Try to get the region
-            Region region = currentRegion.get();
-            if (region != null)
-                return region;
-
-            // No current region, so we want to allocate one. We race
-            // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
-            region = RACE_ALLOCATED.poll();
-            if (region == null)
-                region = new Region(REGION_SIZE);
-            if (currentRegion.compareAndSet(null, region))
-            {
-                regionCount.incrementAndGet();
-                logger.trace("{} regions now allocated in {}", regionCount, this);
-                return region;
-            }
-
-            // someone else won race - that's fine, we'll try to grab theirs
-            // in the next iteration of the loop.
-            RACE_ALLOCATED.add(region);
-        }
-    }
-
-    /**
-     * A region of memory out of which allocations are sliced.
-     *
-     * This serves two purposes:
-     *  - to provide a step between initialization and allocation, so that racing to CAS a
-     *    new region in is harmless
-     *  - encapsulates the allocation offset
-     */
-    private static class Region
-    {
-        /**
-         * Actual underlying data
-         */
-        private ByteBuffer data;
-
-        /**
-         * Offset for the next allocation, or the sentinel value -1
-         * which implies that the region is still uninitialized.
-         */
-        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
-
-        /**
-         * Total number of allocations satisfied from this buffer
-         */
-        private AtomicInteger allocCount = new AtomicInteger();
-
-        /**
-         * Create an uninitialized region. Note that memory is not allocated yet, so
-         * this is cheap.
-         *
-         * @param size in bytes
-         */
-        private Region(int size)
-        {
-            data = ByteBuffer.allocate(size);
-        }
-
-        /**
-         * Try to allocate <code>size</code> bytes from the region.
-         *
-         * @return the successful allocation, or null to indicate not-enough-space
-         */
-        public ByteBuffer allocate(int size)
-        {
-            while (true)
-            {
-                int oldOffset = nextFreeOffset.get();
-
-                if (oldOffset + size > data.capacity()) // capacity == remaining
-                    return null;
-
-                // Try to atomically claim this region
-                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
-                {
-                    // we got the alloc
-                    allocCount.incrementAndGet();
-                    return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
-                }
-                // we raced and lost alloc, try again
-            }
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Region@" + System.identityHashCode(this) +
-                   " allocs=" + allocCount.get() + "waste=" +
-                   (data.capacity() - nextFreeOffset.get());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
deleted file mode 100644
index 0fceeef..0000000
--- a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-public class HeapSlabPool extends Pool
-{
-    public HeapSlabPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
-    {
-        super(maxOnHeapMemory, cleanupThreshold, cleaner);
-    }
-
-    public HeapSlabAllocator newAllocator(OpOrder writes)
-    {
-        return new HeapSlabAllocator(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/Pool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java
index e22b636..aa5e05c 100644
--- a/src/java/org/apache/cassandra/utils/memory/Pool.java
+++ b/src/java/org/apache/cassandra/utils/memory/Pool.java
@@ -18,140 +18,182 @@
  */
 package org.apache.cassandra.utils.memory;
 
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 
 /**
  * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
- * child AbstractAllocator objects. AbstractAllocator and MemoryTracker correspond approximately to PoolAllocator and Pool,
- * respectively, with the MemoryTracker bookkeeping the total shared use of resources, and the AbstractAllocator the amount
- * checked out and in use by a specific PoolAllocator.
- *
- * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
- * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
- * but only needs to allocate if there are none already available. This distinction is not always meaningful.
+ * child PoolAllocator objects.
  */
 public abstract class Pool
 {
-    // total memory/resource permitted to allocate
-    public final long limit;
+    final PoolCleanerThread<?> cleaner;
 
-    // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
-    public final float cleanThreshold;
-
-    // total bytes allocated and reclaiming
-    private AtomicLong allocated = new AtomicLong();
-    private AtomicLong reclaiming = new AtomicLong();
+    // the total memory used by this pool
+    public final SubPool onHeap;
+    public final SubPool offHeap;
 
     final WaitQueue hasRoom = new WaitQueue();
 
-    // a cache of the calculation determining at what allocation threshold we should next clean, and the cleaner we trigger
-    private volatile long nextClean;
-    private final PoolCleanerThread<?> cleanerThread;
-
-    public Pool(long limit, float cleanThreshold, Runnable cleaner)
+    Pool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
     {
-        this.limit = limit;
-        this.cleanThreshold = cleanThreshold;
-        updateNextClean();
-        cleanerThread = cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
-        if (cleanerThread != null)
-            cleanerThread.start();
+        this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
+        this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
+        this.cleaner = getCleaner(cleaner);
+        if (this.cleaner != null)
+            this.cleaner.start();
     }
 
-    /** Methods for tracking and triggering a clean **/
-
-    boolean needsCleaning()
+    SubPool getSubPool(long limit, float cleanThreshold)
     {
-        return used() >= nextClean && updateNextClean() && cleanerThread != null;
+        return new SubPool(limit, cleanThreshold);
     }
 
-    void maybeClean()
+    PoolCleanerThread<?> getCleaner(Runnable cleaner)
     {
-        if (needsCleaning())
-            cleanerThread.trigger();
+        return cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
     }
 
-    private boolean updateNextClean()
+    public abstract boolean needToCopyOnHeap();
+    public abstract PoolAllocator newAllocator();
+
+    /**
+     * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
+     * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
+     * but only needs to allocate if there are none already available. This distinction is not always meaningful.
+     */
+    public class SubPool
     {
-        long reclaiming = this.reclaiming.get();
-        return used() >= (nextClean = reclaiming
-                + (long) (this.limit * cleanThreshold));
-    }
 
-    /** Methods to allocate space **/
+        // total memory/resource permitted to allocate
+        public final long limit;
 
-    boolean tryAllocate(int size)
-    {
-        while (true)
+        // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
+        public final float cleanThreshold;
+
+        // total bytes allocated and reclaiming
+        volatile long allocated;
+        volatile long reclaiming;
+
+        // a cache of the calculation determining at what allocation threshold we should next clean
+        volatile long nextClean;
+
+        public SubPool(long limit, float cleanThreshold)
         {
-            long cur;
-            if ((cur = allocated.get()) + size > limit)
-                return false;
-            if (allocated.compareAndSet(cur, cur + size))
+            this.limit = limit;
+            this.cleanThreshold = cleanThreshold;
+        }
+
+        /** Methods for tracking and triggering a clean **/
+
+        boolean needsCleaning()
+        {
+            // use strictly-greater-than so we don't clean when limit is 0
+            return used() > nextClean && updateNextClean();
+        }
+
+        void maybeClean()
+        {
+            if (needsCleaning() && cleaner != null)
+                cleaner.trigger();
+        }
+
+        private boolean updateNextClean()
+        {
+            while (true)
             {
-                maybeClean();
-                return true;
+                long current = nextClean;
+                long reclaiming = this.reclaiming;
+                long next =  reclaiming + (long) (this.limit * cleanThreshold);
+                if (current == next || nextCleanUpdater.compareAndSet(this, current, next))
+                    return used() > next;
             }
         }
-    }
 
-    /**
-     * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
-     * allocated total, we will signal waiters
-     */
-    void adjustAllocated(long size)
-    {
-        if (size == 0)
-            return;
-        while (true)
+        /** Methods to allocate space **/
+
+        boolean tryAllocate(long size)
         {
-            long cur = allocated.get();
-            if (allocated.compareAndSet(cur, cur + size))
+            while (true)
             {
-                if (size > 0)
-                {
-                    maybeClean();
-                }
+                long cur;
+                if ((cur = allocated) + size > limit)
+                    return false;
+                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
+                    return true;
+            }
+        }
+
+        /**
+         * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
+         * allocated total, we will signal waiters
+         */
+        void adjustAllocated(long size)
+        {
+            if (size == 0)
                 return;
+            while (true)
+            {
+                long cur = allocated;
+                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
+                    return;
             }
         }
-    }
 
-    void release(long size)
-    {
-        adjustAllocated(-size);
-        hasRoom.signalAll();
-    }
+        // 'acquires' an amount of memory, and maybe also marks it allocated. This method is meant to be overridden
+        // by implementations with a separate concept of acquired/allocated. As this method stands, an acquire
+        // without an allocate is a no-op (acquisition is achieved through allocation), however a release (where size < 0)
+        // is always processed and accounted for in allocated.
+        void adjustAcquired(long size, boolean alsoAllocated)
+        {
+            if (size > 0 || alsoAllocated)
+            {
+                if (alsoAllocated)
+                    adjustAllocated(size);
+                maybeClean();
+            }
+            else if (size < 0)
+            {
+                adjustAllocated(size);
+                hasRoom.signalAll();
+            }
+        }
 
-    // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
-    void adjustReclaiming(long reclaiming)
-    {
-        if (reclaiming == 0)
-            return;
-        this.reclaiming.addAndGet(reclaiming);
-        if (reclaiming < 0 && updateNextClean() && cleanerThread != null)
-            cleanerThread.trigger();
-    }
+        // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
+        void adjustReclaiming(long reclaiming)
+        {
+            if (reclaiming == 0)
+                return;
+            reclaimingUpdater.addAndGet(this, reclaiming);
+            if (reclaiming < 0 && updateNextClean() && cleaner != null)
+                cleaner.trigger();
+        }
 
-    public long allocated()
-    {
-        return allocated.get();
-    }
+        public long allocated()
+        {
+            return allocated;
+        }
 
-    public long used()
-    {
-        return allocated.get();
-    }
+        public long used()
+        {
+            return allocated;
+        }
 
-    public long reclaiming()
-    {
-        return reclaiming.get();
+        public PoolAllocator.SubAllocator newAllocator()
+        {
+            return new PoolAllocator.SubAllocator(this);
+        }
+
+        public WaitQueue hasRoom()
+        {
+            return hasRoom;
+        }
     }
 
-    public abstract PoolAllocator newAllocator(OpOrder writes);
-}
+    private static final AtomicLongFieldUpdater<SubPool> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "reclaiming");
+    private static final AtomicLongFieldUpdater<SubPool> allocatedUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "allocated");
+    private static final AtomicLongFieldUpdater<SubPool> nextCleanUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "nextClean");
 
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
index 289d0ac..aa374fe 100644
--- a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
@@ -18,50 +18,64 @@
 package org.apache.cassandra.utils.memory;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
-public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
+public abstract class PoolAllocator extends AbstractAllocator
 {
-    public final P pool;
+
+    private final SubAllocator onHeap;
+    private final SubAllocator offHeap;
     volatile LifeCycle state = LifeCycle.LIVE;
 
     static enum LifeCycle
     {
         LIVE, DISCARDING, DISCARDED;
-        LifeCycle transition(LifeCycle target)
+        LifeCycle transition(LifeCycle targetState)
         {
-            assert target.ordinal() == ordinal() + 1;
-            return target;
+            switch (targetState)
+            {
+                case DISCARDING:
+                    assert this == LifeCycle.LIVE;
+                    return LifeCycle.DISCARDING;
+                case DISCARDED:
+                    assert this == LifeCycle.DISCARDING;
+                    return LifeCycle.DISCARDED;
+            }
+            throw new IllegalStateException();
         }
     }
 
-    // the amount of memory/resource owned by this object
-    private AtomicLong owns = new AtomicLong();
-    // the amount of memory we are reporting to collect; this may be inaccurate, but is close
-    // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
-    private AtomicLong reclaiming = new AtomicLong();
+    PoolAllocator(SubAllocator onHeap, SubAllocator offHeap)
+    {
+        this.onHeap = onHeap;
+        this.offHeap = offHeap;
+    }
+
+    public SubAllocator onHeap()
+    {
+        return onHeap;
+    }
 
-    PoolAllocator(P pool)
+    public SubAllocator offHeap()
     {
-        this.pool = pool;
+        return offHeap;
     }
 
     /**
-     * Mark this allocator as reclaiming; this will mark the memory it owns as reclaiming, so remove it from
-     * any calculation deciding if further cleaning/reclamation is necessary.
+     * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
+     * overshoot the maximum memory limit so that flushing can begin immediately
      */
     public void setDiscarding()
     {
         state = state.transition(LifeCycle.DISCARDING);
         // mark the memory owned by this allocator as reclaiming
-        long prev = reclaiming.get();
-        long cur = owns.get();
-        reclaiming.set(cur);
-        pool.adjustReclaiming(cur - prev);
+        onHeap.markAllReclaiming();
+        offHeap.markAllReclaiming();
     }
 
     /**
@@ -72,60 +86,8 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
     {
         state = state.transition(LifeCycle.DISCARDED);
         // release any memory owned by this allocator; automatically signals waiters
-        pool.release(owns.getAndSet(0));
-        pool.adjustReclaiming(-reclaiming.get());
-    }
-
-    public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
-
-    /** Mark the BB as unused, permitting it to be reclaimed */
-    public abstract void free(ByteBuffer name);
-
-    // mark ourselves as owning memory from the tracker.  meant to be called by subclass
-    // allocate method that actually allocates and returns a ByteBuffer
-    protected void markAllocated(int size, OpOrder.Group opGroup)
-    {
-        while (true)
-        {
-            if (pool.tryAllocate(size))
-            {
-                acquired(size);
-                return;
-            }
-            WaitQueue.Signal signal = opGroup.isBlockingSignal(pool.hasRoom.register());
-            boolean allocated = pool.tryAllocate(size);
-            if (allocated || opGroup.isBlocking())
-            {
-                signal.cancel();
-                if (allocated) // if we allocated, take ownership
-                    acquired(size);
-                else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
-                    allocated(size);
-                return;
-            }
-            else
-                signal.awaitUninterruptibly();
-        }
-    }
-
-    // retroactively mark (by-passes any constraints) an amount allocated in the tracker, and owned by us.
-    private void allocated(int size)
-    {
-        pool.adjustAllocated(size);
-        owns.addAndGet(size);
-    }
-
-    // retroactively mark (by-passes any constraints) an amount owned by us
-    private void acquired(int size)
-    {
-        owns.addAndGet(size);
-    }
-
-    // release an amount of memory from our ownership, and deallocate it in the tracker
-    void release(int size)
-    {
-        pool.release(size);
-        owns.addAndGet(-size);
+        onHeap.releaseAll();
+        offHeap.releaseAll();
     }
 
     public boolean isLive()
@@ -133,6 +95,9 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
         return state == LifeCycle.LIVE;
     }
 
+    public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
+    public abstract void free(ByteBuffer name);
+
     /**
      * Allocate a slice of the given length.
      */
@@ -154,21 +119,107 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
         return new ContextAllocator(opGroup, this);
     }
 
-    @Override
-    public long owns()
+    /** Mark the BB as unused, permitting it to be reclaimed */
+    public static final class SubAllocator
     {
-        return owns.get();
-    }
+        // the tracker we are owning memory from
+        private final Pool.SubPool parent;
 
-    @Override
-    public float ownershipRatio()
-    {
-        return owns.get() / (float) pool.limit;
-    }
+        // the amount of memory/resource owned by this object
+        private volatile long owns;
+        // the amount of memory we are reporting to collect; this may be inaccurate, but is close
+        // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
+        private volatile long reclaiming;
 
-    @Override
-    public long reclaiming()
-    {
-        return reclaiming.get();
+        SubAllocator(Pool.SubPool parent)
+        {
+            this.parent = parent;
+        }
+
+        // should only be called once we know we will never allocate to the object again.
+        // currently no corroboration/enforcement of this is performed.
+        void releaseAll()
+        {
+            parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false);
+            parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0));
+        }
+
+        // allocate memory in the tracker, and mark ourselves as owning it
+        public void allocate(long size, OpOrder.Group opGroup)
+        {
+            while (true)
+            {
+                if (parent.tryAllocate(size))
+                {
+                    acquired(size);
+                    return;
+                }
+                WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register());
+                boolean allocated = parent.tryAllocate(size);
+                if (allocated || opGroup.isBlocking())
+                {
+                    signal.cancel();
+                    if (allocated) // if we allocated, take ownership
+                        acquired(size);
+                    else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
+                        allocated(size);
+                    return;
+                }
+                else
+                    signal.awaitUninterruptibly();
+            }
+        }
+
+        // retroactively mark an amount allocated amd acquired in the tracker, and owned by us
+        void allocated(long size)
+        {
+            parent.adjustAcquired(size, true);
+            ownsUpdater.addAndGet(this, size);
+        }
+
+        // retroactively mark an amount acquired in the tracker, and owned by us
+        void acquired(long size)
+        {
+            parent.adjustAcquired(size, false);
+            ownsUpdater.addAndGet(this, size);
+        }
+
+        void release(long size)
+        {
+            parent.adjustAcquired(-size, false);
+            ownsUpdater.addAndGet(this, -size);
+        }
+
+        // mark everything we currently own as reclaiming, both here and in our parent
+        void markAllReclaiming()
+        {
+            while (true)
+            {
+                long cur = owns;
+                long prev = reclaiming;
+                if (reclaimingUpdater.compareAndSet(this, prev, cur))
+                {
+                    parent.adjustReclaiming(cur - prev);
+                    return;
+                }
+            }
+        }
+
+        public long owns()
+        {
+            return owns;
+        }
+
+        public float ownershipRatio()
+        {
+            float r = owns / (float) parent.limit;
+            if (Float.isNaN(r))
+                return 0;
+            return r;
+        }
+
+        private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
+        private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming");
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
index 24f71d2..68b0c20 100644
--- a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.utils.memory;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 /**
- * A thread that reclaims memor from a Pool on demand.  The actual reclaiming work is delegated to the
+ * A thread that reclaims memory from a Pool on demand.  The actual reclaiming work is delegated to the
  * cleaner Runnable, e.g., FlushLargestColumnFamily
  */
 class PoolCleanerThread<P extends Pool> extends Thread
@@ -44,7 +44,7 @@ class PoolCleanerThread<P extends Pool> extends Thread
 
     boolean needsCleaning()
     {
-        return pool.needsCleaning();
+        return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
     }
 
     // should ONLY be called when we really think it already needs cleaning

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
new file mode 100644
index 0000000..a90357c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * The SlabAllocator is a bump-the-pointer allocator that allocates
+ * large (2MB by default) regions and then doles them out to threads that request
+ * slices into the array.
+ * <p/>
+ * The purpose of this class is to combat heap fragmentation in long lived
+ * objects: by ensuring that all allocations with similar lifetimes
+ * only to large regions of contiguous memory, we ensure that large blocks
+ * get freed up at the same time.
+ * <p/>
+ * Otherwise, variable length byte arrays allocated end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ */
+public class SlabAllocator extends PoolAllocator
+{
+    private static final Logger logger = LoggerFactory.getLogger(SlabAllocator.class);
+
+    private final static int REGION_SIZE = 1024 * 1024;
+    private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+
+    // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
+    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+
+    private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+    private final AtomicInteger regionCount = new AtomicInteger(0);
+
+    // this queue is used to keep references to off-heap allocated regions so that we can free them when we are discarded
+    private final ConcurrentLinkedQueue<Region> offHeapRegions = new ConcurrentLinkedQueue<>();
+    private AtomicLong unslabbedSize = new AtomicLong(0);
+    private final boolean allocateOnHeapOnly;
+
+    SlabAllocator(SubAllocator onHeap, SubAllocator offHeap, boolean allocateOnHeapOnly)
+    {
+        super(onHeap, offHeap);
+        this.allocateOnHeapOnly = allocateOnHeapOnly;
+    }
+
+    public ByteBuffer allocate(int size)
+    {
+        return allocate(size, null);
+    }
+
+    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+    {
+        assert size >= 0;
+        if (size == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        (allocateOnHeapOnly ? onHeap() : offHeap()).allocate(size, opGroup);
+        // satisfy large allocations directly from JVM since they don't cause fragmentation
+        // as badly, and fill up our regions quickly
+        if (size > MAX_CLONED_SIZE)
+        {
+            unslabbedSize.addAndGet(size);
+            if (allocateOnHeapOnly)
+                return ByteBuffer.allocate(size);
+            Region region = new Region(ByteBuffer.allocateDirect(size));
+            offHeapRegions.add(region);
+            return region.allocate(size);
+        }
+
+        while (true)
+        {
+            Region region = getRegion();
+
+            // Try to allocate from this region
+            ByteBuffer cloned = region.allocate(size);
+            if (cloned != null)
+                return cloned;
+
+            // not enough space!
+            currentRegion.compareAndSet(region, null);
+        }
+    }
+
+    public void free(ByteBuffer name)
+    {
+        // have to assume we cannot free the memory here, and just reclaim it all when we flush
+    }
+
+    public void setDiscarded()
+    {
+        for (Region region : offHeapRegions)
+            ((DirectBuffer) region.data).cleaner().clean();
+        super.setDiscarded();
+    }
+
+    /**
+     * Get the current region, or, if there is no current region, allocate a new one
+     */
+    private Region getRegion()
+    {
+        while (true)
+        {
+            // Try to get the region
+            Region region = currentRegion.get();
+            if (region != null)
+                return region;
+
+            // No current region, so we want to allocate one. We race
+            // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
+            region = RACE_ALLOCATED.poll();
+            if (region == null)
+                region = new Region(allocateOnHeapOnly ? ByteBuffer.allocate(REGION_SIZE) : ByteBuffer.allocateDirect(REGION_SIZE));
+            if (currentRegion.compareAndSet(null, region))
+            {
+                if (!allocateOnHeapOnly)
+                    offHeapRegions.add(region);
+                regionCount.incrementAndGet();
+                logger.trace("{} regions now allocated in {}", regionCount, this);
+                return region;
+            }
+
+            // someone else won race - that's fine, we'll try to grab theirs
+            // in the next iteration of the loop.
+            RACE_ALLOCATED.add(region);
+        }
+    }
+
+    /**
+     * A region of memory out of which allocations are sliced.
+     *
+     * This serves two purposes:
+     *  - to provide a step between initialization and allocation, so that racing to CAS a
+     *    new region in is harmless
+     *  - encapsulates the allocation offset
+     */
+    private static class Region
+    {
+        /**
+         * Actual underlying data
+         */
+        private ByteBuffer data;
+
+        /**
+         * Offset for the next allocation, or the sentinel value -1
+         * which implies that the region is still uninitialized.
+         */
+        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+
+        /**
+         * Total number of allocations satisfied from this buffer
+         */
+        private AtomicInteger allocCount = new AtomicInteger();
+
+        /**
+         * Create an uninitialized region. Note that memory is not allocated yet, so
+         * this is cheap.
+         *
+         * @param buffer bytes
+         */
+        private Region(ByteBuffer buffer)
+        {
+            data = buffer;
+        }
+
+        /**
+         * Try to allocate <code>size</code> bytes from the region.
+         *
+         * @return the successful allocation, or null to indicate not-enough-space
+         */
+        public ByteBuffer allocate(int size)
+        {
+            while (true)
+            {
+                int oldOffset = nextFreeOffset.get();
+
+                if (oldOffset + size > data.capacity()) // capacity == remaining
+                    return null;
+
+                // Try to atomically claim this region
+                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+                {
+                    // we got the alloc
+                    allocCount.incrementAndGet();
+                    return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
+                }
+                // we raced and lost alloc, try again
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Region@" + System.identityHashCode(this) +
+                   " allocs=" + allocCount.get() + "waste=" +
+                   (data.capacity() - nextFreeOffset.get());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/SlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
new file mode 100644
index 0000000..7276e57
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+
+public class SlabPool extends Pool
+{
+
+    final boolean allocateOnHeap;
+    public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner)
+    {
+        super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner);
+        this.allocateOnHeap = maxOffHeapMemory == 0;
+    }
+
+    public SlabAllocator newAllocator()
+    {
+        return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap);
+    }
+
+    public boolean needToCopyOnHeap()
+    {
+        return !allocateOnHeap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index a207bc6..37b0b96 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,6 +3,7 @@
 # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
 #
 cluster_name: Test Cluster
+memtable_allocation_type: offheap_buffers
 in_memory_compaction_limit_in_mb: 1
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
index b3a9126..fc92aae 100644
--- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -18,7 +18,7 @@
 */
 package org.apache.cassandra.db;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -26,7 +26,8 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 public class CollationControllerTest extends SchemaLoader
 {
@@ -67,7 +68,7 @@ public class CollationControllerTest extends SchemaLoader
         // It should only iterate the last flushed sstable, since it probably contains the most recent value for Column1
         QueryFilter filter = Util.namesQueryFilter(cfs, dk, "Column1");
         CollationController controller = new CollationController(cfs, filter, Integer.MIN_VALUE);
-        controller.getTopLevelColumns();
+        controller.getTopLevelColumns(true);
         assertEquals(1, controller.getSstablesIterated());
 
         // SliceQueryFilter goes down another path (through collectAllData())
@@ -75,7 +76,7 @@ public class CollationControllerTest extends SchemaLoader
         // recent than the maxTimestamp of the very first sstable we flushed, we should only read the 2 first sstables.
         filter = QueryFilter.getIdentityFilter(dk, cfs.name, System.currentTimeMillis());
         controller = new CollationController(cfs, filter, Integer.MIN_VALUE);
-        controller.getTopLevelColumns();
+        controller.getTopLevelColumns(true);
         assertEquals(2, controller.getSstablesIterated());
     }
 
@@ -109,10 +110,10 @@ public class CollationControllerTest extends SchemaLoader
 
         filter = QueryFilter.getNamesFilter(dk, cfs.name, FBUtilities.singleton(cellName, cfs.getComparator()), queryAt);
         CollationController controller = new CollationController(cfs, filter, gcBefore);
-        assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(), gcBefore) == null;
+        assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(true), gcBefore) == null;
 
         filter = QueryFilter.getIdentityFilter(dk, cfs.name, queryAt);
         controller = new CollationController(cfs, filter, gcBefore);
-        assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(), gcBefore) == null;
+        assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(true), gcBefore) == null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 8502dd5..e9fc746 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -19,35 +19,45 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
+import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.PoolAllocator;
 
+import static org.apache.cassandra.Util.dk;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import static org.apache.cassandra.Util.dk;
-
 public class RangeTombstoneTest extends SchemaLoader
 {
     private static final String KSNAME = "Keyspace1";
@@ -571,7 +581,7 @@ public class RangeTombstoneTest extends SchemaLoader
         public void forceBlockingFlush(){}
 
         @Override
-        public AbstractAllocator getOnHeapAllocator()
+        public PoolAllocator getAllocator()
         {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java
index 220e3b9..9e911b4 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java
@@ -21,16 +21,17 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
 import org.junit.Test;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.PerRowSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.PoolAllocator;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -92,7 +93,7 @@ public class SecondaryIndexCellSizeTest
         {
         }
 
-        public AbstractAllocator getOnHeapAllocator()
+        public PoolAllocator getAllocator()
         {
             return null;
         }
@@ -172,7 +173,7 @@ public class SecondaryIndexCellSizeTest
         }
 
         @Override
-        public AbstractAllocator getOnHeapAllocator()
+        public PoolAllocator getAllocator()
         {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
index 55d5b7c..bc297ab 100644
--- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
+++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
@@ -20,18 +20,25 @@
  */
 package org.apache.cassandra.db.context;
 
-import static org.junit.Assert.*;
-
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ClockAndCount;
 import org.apache.cassandra.db.context.CounterContext.Relationship;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.Pool;
+import org.apache.cassandra.utils.memory.SlabPool;
 
 import static org.apache.cassandra.db.context.CounterContext.ContextState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 public class CounterContextTest
 {
@@ -44,6 +51,16 @@ public class CounterContextTest
     private static final int countLength = 8;
     private static final int stepLength = idLength + clockLength + countLength;
 
+    private static final Pool POOL = new SlabPool(Integer.MAX_VALUE, 0, 1f, null);
+
+    /** Allocates 1 byte from a new SlabAllocator and returns it. */
+    private AbstractAllocator bumpedSlab()
+    {
+        AbstractAllocator allocator = POOL.newAllocator();
+        allocator.allocate(1);
+        return allocator;
+    }
+
     @Test
     public void testAllocate()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 660a6e0..c57ba05 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -22,19 +22,23 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Set;
 
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.Util;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.PoolAllocator;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -175,7 +179,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         }
 
         @Override
-        public AbstractAllocator getOnHeapAllocator()
+        public PoolAllocator getAllocator()
         {
             return null;
         }


Mime
View raw message