cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [3/7] cassandra git commit: Safer Resource Management++
Date Wed, 11 Feb 2015 15:23:38 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index ceba89b..77b2d44 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -21,9 +21,10 @@ import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.obs.IBitSet;
 
-public abstract class BloomFilter implements IFilter
+public abstract class BloomFilter extends WrappedSharedCloseable implements IFilter
 {
     private static final ThreadLocal<long[]> reusableIndexes = new ThreadLocal<long[]>()
     {
@@ -36,12 +37,20 @@ public abstract class BloomFilter implements IFilter
     public final IBitSet bitset;
     public final int hashCount;
 
-    BloomFilter(int hashes, IBitSet bitset)
+    BloomFilter(int hashCount, IBitSet bitset)
     {
-        this.hashCount = hashes;
+        super(bitset);
+        this.hashCount = hashCount;
         this.bitset = bitset;
     }
 
+    BloomFilter(BloomFilter copy)
+    {
+        super(copy);
+        this.hashCount = copy.hashCount;
+        this.bitset = copy.bitset;
+    }
+
     // Murmur is faster than an SHA-based approach and provides as-good collision
     // resistance.  The combinatorial generation approach described in
     // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf
@@ -110,9 +119,4 @@ public abstract class BloomFilter implements IFilter
     {
         bitset.clear();
     }
-
-    public void close()
-    {
-        bitset.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index 60c0590..bde6333 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -17,10 +17,11 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 
-public interface IFilter extends Closeable
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
+
+public interface IFilter extends SharedCloseable
 {
     void add(ByteBuffer key);
 
@@ -32,6 +33,8 @@ public interface IFilter extends Closeable
 
     void close();
 
+    IFilter sharedCopy();
+
     /**
      * Returns the amount of memory in bytes used off heap.
      * @return the amount of memory in bytes used off heap

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
index f7c7632..431ca5b 100644
--- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
@@ -26,9 +26,14 @@ public class Murmur3BloomFilter extends BloomFilter
 {
     public static final Murmur3BloomFilterSerializer serializer = new Murmur3BloomFilterSerializer();
 
-    public Murmur3BloomFilter(int hashes, IBitSet bs)
+    public Murmur3BloomFilter(int hashCount, IBitSet bs)
     {
-        super(hashes, bs);
+        super(hashCount, bs);
+    }
+
+    protected Murmur3BloomFilter(Murmur3BloomFilter copy)
+    {
+        super(copy);
     }
 
     public long serializedSize()
@@ -36,6 +41,11 @@ public class Murmur3BloomFilter extends BloomFilter
         return serializer.serializedSize(this, TypeSizes.NATIVE);
     }
 
+    public IFilter sharedCopy()
+    {
+        return new Murmur3BloomFilter(this);
+    }
+
     @Override
     public long offHeapSize()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 4afceb0..ad1293b 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -2,24 +2,69 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
 /**
- * A single managed reference to a RefCounted object
+ * An object that needs ref counting does the two following:
+ *   - defines a Tidy object that will cleanup once it's gone,
+ *     (this must retain no references to the object we're tracking (only its resources and how to clean up))
+ * Then, one of two options:
+ * 1) Construct a Ref directly pointing to it, and always use this Ref; or
+ * 2)
+ *   - implements RefCounted
+ *   - encapsulates a Ref, we'll call selfRef, to which it proxies all calls to RefCounted behaviours
+ *   - users must ensure no references to the selfRef leak, or are retained outside of a method scope.
+ *     (to ensure the selfRef is collected with the object, so that leaks may be detected and corrected)
+ *
+ * This class' functionality is achieved by what may look at first glance like a complex web of references,
+ * but boils down to:
+ *
+ * Target --> selfRef --> [Ref.State] <--> Ref.GlobalState --> Tidy
+ *                                             ^
+ *                                             |
+ * Ref ----------------------------------------
+ *                                             |
+ * Global -------------------------------------
+ *
+ * So that, if Target is collected, Impl is collected and, hence, so is selfRef.
+ *
+ * Once ref or selfRef are collected, the paired Ref.State's release method is called, which if it had
+ * not already been called will update Ref.GlobalState and log an error.
+ *
+ * Once the Ref.GlobalState has been completely released, the Tidy method is called and it removes the global reference
+ * to itself so it may also be collected.
  */
-public final class Ref
+public final class Ref<T> implements RefCounted<T>, AutoCloseable
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
     static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
 
     final State state;
+    final T referent;
 
-    Ref(RefCountedImpl.GlobalState state, boolean isSharedRef)
+    public Ref(T referent, Tidy tidy)
     {
-        this.state = new State(state, this, RefCountedImpl.referenceQueue, isSharedRef);
+        this.state = new State(new GlobalState(tidy), this, referenceQueue);
+        this.referent = referent;
+    }
+
+    Ref(T referent, GlobalState state)
+    {
+        this.state = new State(state, this, referenceQueue);
+        this.referent = referent;
     }
 
     /**
@@ -32,6 +77,36 @@ public final class Ref
         state.release(false);
     }
 
+    public void ensureReleased()
+    {
+        state.ensureReleased();
+    }
+
+    public void close()
+    {
+        state.ensureReleased();
+    }
+
+    public T get()
+    {
+        state.assertNotReleased();
+        return referent;
+    }
+
+    public Ref<T> tryRef()
+    {
+        return state.globalState.ref() ? new Ref<>(referent, state.globalState) : null;
+    }
+
+    public Ref<T> ref()
+    {
+        Ref<T> ref = tryRef();
+        // TODO: print the last release as well as the release here
+        if (ref == null)
+            state.assertNotReleased();
+        return ref;
+    }
+
     /**
      * A convenience method for reporting:
      * @return the number of currently extant references globally, including the shared reference
@@ -41,25 +116,36 @@ public final class Ref
         return state.globalState.count();
     }
 
-    // similar to RefCountedState, but tracks only the management of each unique ref created to the managed object
+    // similar to Ref.GlobalState, but tracks only the management of each unique ref created to the managed object
     // ensures it is only released once, and that it is always released
     static final class State extends PhantomReference<Ref>
     {
         final Debug debug = DEBUG_ENABLED ? new Debug() : null;
-        final boolean isSharedRef;
-        final RefCountedImpl.GlobalState globalState;
+        final GlobalState globalState;
         private volatile int released;
 
         private static final AtomicIntegerFieldUpdater<State> releasedUpdater = AtomicIntegerFieldUpdater.newUpdater(State.class, "released");
 
-        public State(final RefCountedImpl.GlobalState globalState, Ref reference, ReferenceQueue<? super Ref> q, boolean isSharedRef)
+        public State(final GlobalState globalState, Ref reference, ReferenceQueue<? super Ref> q)
         {
             super(reference, q);
             this.globalState = globalState;
-            this.isSharedRef = isSharedRef;
             globalState.register(this);
         }
 
+        void assertNotReleased()
+        {
+            if (DEBUG_ENABLED && released == 1)
+                debug.log(toString());
+            assert released == 0;
+        }
+
+        void ensureReleased()
+        {
+            if (releasedUpdater.getAndSet(this, 1) == 0)
+                globalState.release(this);
+        }
+
         void release(boolean leak)
         {
             if (!releasedUpdater.compareAndSet(this, 0, 1))
@@ -67,7 +153,7 @@ public final class Ref
                 if (!leak)
                 {
                     String id = this.toString();
-                    logger.error("BAD RELEASE: attempted to release a{} reference ({}) that has already been released", isSharedRef ? " shared" : "", id);
+                    logger.error("BAD RELEASE: attempted to release a reference ({}) that has already been released", id);
                     if (DEBUG_ENABLED)
                         debug.log(id);
                     throw new IllegalStateException("Attempted to release a reference that has already been released");
@@ -78,10 +164,7 @@ public final class Ref
             if (leak)
             {
                 String id = this.toString();
-                if (isSharedRef)
-                    logger.error("LEAK DETECTED: the shared reference ({}) to {} was not released before the object was garbage collected", id, globalState);
-                else
-                    logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState);
+                logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState);
                 if (DEBUG_ENABLED)
                     debug.log(id);
             }
@@ -129,6 +212,101 @@ public final class Ref
         }
     }
 
-}
+    // the object that manages the actual cleaning up; this does not reference the target object
+    // so that we can detect when references are lost to the resource itself, and still cleanup afterwards
+    // the Tidy object MUST not contain any references to the object we are managing
+    static final class GlobalState
+    {
+        // we need to retain a reference to each of the PhantomReference instances
+        // we are using to track individual refs
+        private final ConcurrentLinkedQueue<State> locallyExtant = new ConcurrentLinkedQueue<>();
+        // the number of live refs
+        private final AtomicInteger counts = new AtomicInteger();
+        // the object to call to cleanup when our refs are all finished with
+        private final Tidy tidy;
+
+        GlobalState(Tidy tidy)
+        {
+            this.tidy = tidy;
+            globallyExtant.add(this);
+        }
 
+        void register(Ref.State ref)
+        {
+            locallyExtant.add(ref);
+        }
 
+        // increment ref count if not already tidied, and return success/failure
+        boolean ref()
+        {
+            while (true)
+            {
+                int cur = counts.get();
+                if (cur < 0)
+                    return false;
+                if (counts.compareAndSet(cur, cur + 1))
+                    return true;
+            }
+        }
+
+        // release a single reference, and cleanup if no more are extant
+        void release(Ref.State ref)
+        {
+            locallyExtant.remove(ref);
+            if (-1 == counts.decrementAndGet())
+            {
+                globallyExtant.remove(this);
+                try
+                {
+                    tidy.tidy();
+                }
+                catch (Throwable t)
+                {
+                    logger.error("Error when closing {}", this, t);
+                    Throwables.propagate(t);
+                }
+            }
+        }
+
+        int count()
+        {
+            return 1 + counts.get();
+        }
+
+        public String toString()
+        {
+            return tidy.getClass() + "@" + System.identityHashCode(tidy) + ":" + tidy.name();
+        }
+    }
+
+    private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<GlobalState, Boolean>());
+    static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
+    private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+    static
+    {
+        EXEC.execute(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    while (true)
+                    {
+                        Object obj = referenceQueue.remove();
+                        if (obj instanceof Ref.State)
+                        {
+                            ((Ref.State) obj).release(true);
+                        }
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                }
+                finally
+                {
+                    EXEC.execute(this);
+                }
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
index 7ad51ad..e68c7bd 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
@@ -36,59 +36,29 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 
 /**
- * An object that needs ref counting does the following:
+ * An object that needs ref counting does the two following:
  *   - defines a Tidy object that will cleanup once it's gone,
  *     (this must retain no references to the object we're tracking (only its resources and how to clean up))
+ * Then, one of two options:
+ * 1) Construct a Ref directly pointing to it, and always use this Ref; or
+ * 2)
  *   - implements RefCounted
- *   - encapsulates a RefCounted.Impl, to which it proxies all calls to RefCounted behaviours
- *   - ensures no external access to the encapsulated Impl, and permits no references to it to leak
- *   - users must ensure no references to the sharedRef leak, or are retained outside of a method scope either.
- *     (to ensure the sharedRef is collected with the object, so that leaks may be detected and corrected)
- *
- * This class' functionality is achieved by what may look at first glance like a complex web of references,
- * but boils down to:
- *
- * Target --> Impl --> sharedRef --> [RefState] <--> RefCountedState --> Tidy
- *                                        ^                ^
- *                                        |                |
- * Ref -----------------------------------                 |
- *                                                         |
- * Global -------------------------------------------------
- *
- * So that, if Target is collected, Impl is collected and, hence, so is sharedRef.
- *
- * Once ref or sharedRef are collected, the paired RefState's release method is called, which if it had
- * not already been called will update RefCountedState and log an error.
- *
- * Once the RefCountedState has been completely released, the Tidy method is called and it removes the global reference
- * to itself so it may also be collected.
+ *   - encapsulates a Ref, we'll call selfRef, to which it proxies all calls to RefCounted behaviours
+ *   - users must ensure no references to the selfRef leak, or are retained outside of a method scope.
+ *     (to ensure the selfRef is collected with the object, so that leaks may be detected and corrected)
  */
-public interface RefCounted
+public interface RefCounted<T>
 {
-
     /**
      * @return the a new Ref() to the managed object, incrementing its refcount, or null if it is already released
      */
-    public Ref tryRef();
+    public Ref<T> tryRef();
 
-    /**
-     * @return the shared Ref that is created at instantiation of the RefCounted instance.
-     * Once released, if no other refs are extant the object will be tidied; references to
-     * this object should never be retained outside of a method's scope
-     */
-    public Ref sharedRef();
+    public Ref<T> ref();
 
     public static interface Tidy
     {
-        void tidy();
+        void tidy() throws Exception;
         String name();
     }
-
-    public static class Impl
-    {
-        public static RefCounted get(Tidy tidy)
-        {
-            return new RefCountedImpl(tidy);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java b/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
deleted file mode 100644
index 0de6f40..0000000
--- a/src/java/org/apache/cassandra/utils/concurrent/RefCountedImpl.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.cassandra.utils.concurrent;
-
-import java.lang.ref.ReferenceQueue;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-
-// default implementation; can be hidden and proxied (like we do for SSTableReader)
-final class RefCountedImpl implements RefCounted
-{
-    private final Ref sharedRef;
-    private final GlobalState state;
-
-    public RefCountedImpl(Tidy tidy)
-    {
-        this.state = new GlobalState(tidy);
-        sharedRef = new Ref(this.state, true);
-        globallyExtant.add(this.state);
-    }
-
-    /**
-     * see {@link RefCounted#tryRef()}
-     */
-    public Ref tryRef()
-    {
-        return state.ref() ? new Ref(state, false) : null;
-    }
-
-    /**
-     * see {@link RefCounted#sharedRef()}
-     */
-    public Ref sharedRef()
-    {
-        return sharedRef;
-    }
-
-    // the object that manages the actual cleaning up; this does not reference the RefCounted.Impl
-    // so that we can detect when references are lost to the resource itself, and still cleanup afterwards
-    // the Tidy object MUST not contain any references to the object we are managing
-    static final class GlobalState
-    {
-        // we need to retain a reference to each of the PhantomReference instances
-        // we are using to track individual refs
-        private final ConcurrentLinkedQueue<Ref.State> locallyExtant = new ConcurrentLinkedQueue<>();
-        // the number of live refs
-        private final AtomicInteger counts = new AtomicInteger();
-        // the object to call to cleanup when our refs are all finished with
-        private final Tidy tidy;
-
-        GlobalState(Tidy tidy)
-        {
-            this.tidy = tidy;
-        }
-
-        void register(Ref.State ref)
-        {
-            locallyExtant.add(ref);
-        }
-
-        // increment ref count if not already tidied, and return success/failure
-        boolean ref()
-        {
-            while (true)
-            {
-                int cur = counts.get();
-                if (cur < 0)
-                    return false;
-                if (counts.compareAndSet(cur, cur + 1))
-                    return true;
-            }
-        }
-
-        // release a single reference, and cleanup if no more are extant
-        void release(Ref.State ref)
-        {
-            locallyExtant.remove(ref);
-            if (-1 == counts.decrementAndGet())
-            {
-                globallyExtant.remove(this);
-                tidy.tidy();
-            }
-        }
-
-        int count()
-        {
-            return 1 + counts.get();
-        }
-
-        public String toString()
-        {
-            return tidy.name();
-        }
-    }
-
-    private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<GlobalState, Boolean>());
-    static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
-    private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
-    static
-    {
-        EXEC.execute(new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    while (true)
-                    {
-                        Object obj = referenceQueue.remove();
-                        if (obj instanceof Ref.State)
-                        {
-                            ((Ref.State) obj).release(true);
-                        }
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                }
-                finally
-                {
-                    EXEC.execute(this);
-                }
-            }
-        });
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index ed5fcfa..3a930d2 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -12,16 +12,16 @@ import com.google.common.collect.Iterators;
  *
  * All of the java.util.Collection operations that modify the collection are unsupported.
  */
-public final class Refs<T extends RefCounted> extends AbstractCollection<T> implements AutoCloseable
+public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> implements AutoCloseable
 {
-    private final Map<T, Ref> references;
+    private final Map<T, Ref<T>> references;
 
     public Refs()
     {
         this.references = new HashMap<>();
     }
 
-    public Refs(Map<T, Ref> references)
+    public Refs(Map<T, Ref<T>> references)
     {
         this.references = new HashMap<>(references);
     }
@@ -88,11 +88,11 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
      */
     public void release(Collection<T> release)
     {
-        List<Ref> refs = new ArrayList<>();
+        List<Ref<T>> refs = new ArrayList<>();
         List<T> notPresent = null;
         for (T obj : release)
         {
-            Ref ref = references.remove(obj);
+            Ref<T> ref = references.remove(obj);
             if (ref == null)
             {
                 if (notPresent == null)
@@ -132,7 +132,7 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
      */
     public boolean tryRef(T t)
     {
-        Ref ref = t.tryRef();
+        Ref<T> ref = t.tryRef();
         if (ref == null)
             return false;
         ref = references.put(t, ref);
@@ -156,8 +156,8 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
      */
     public Refs<T> addAll(Refs<T> add)
     {
-        List<Ref> overlap = new ArrayList<>();
-        for (Map.Entry<T, Ref> e : add.references.entrySet())
+        List<Ref<T>> overlap = new ArrayList<>();
+        for (Map.Entry<T, Ref<T>> e : add.references.entrySet())
         {
             if (this.references.containsKey(e.getKey()))
                 overlap.add(e.getValue());
@@ -172,12 +172,12 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
     /**
      * Acquire a reference to all of the provided objects, or none
      */
-    public static <T extends RefCounted> Refs<T> tryRef(Iterable<T> reference)
+    public static <T extends RefCounted<T>> Refs<T> tryRef(Iterable<T> reference)
     {
-        HashMap<T, Ref> refs = new HashMap<>();
+        HashMap<T, Ref<T>> refs = new HashMap<>();
         for (T rc : reference)
         {
-            Ref ref = rc.tryRef();
+            Ref<T> ref = rc.tryRef();
             if (ref == null)
             {
                 release(refs.values());
@@ -188,7 +188,7 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
         return new Refs<T>(refs);
     }
 
-    public static <T extends RefCounted> Refs<T> ref(Iterable<T> reference)
+    public static <T extends RefCounted<T>> Refs<T> ref(Iterable<T> reference)
     {
         Refs<T> refs = tryRef(reference);
         if (refs != null)
@@ -196,7 +196,7 @@ public final class Refs<T extends RefCounted> extends AbstractCollection<T> impl
         throw new IllegalStateException();
     }
 
-    private static void release(Iterable<Ref> refs)
+    private static void release(Iterable<? extends Ref<?>> refs)
     {
         Throwable fail = null;
         for (Ref ref : refs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index b23f1c6..02efa65 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -172,4 +172,9 @@ public class OffHeapBitSet implements IBitSet
         }
         return (int) ((h >> 32) ^ h) + 0x98761234;
     }
+
+    public String toString()
+    {
+        return "[OffHeapBitSet]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 2732be5..ec8ad8a 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -1753,9 +1753,10 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         sstables = dir.sstableLister().list();
         assertEquals(2, sstables.size());
 
+        SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor);
         UUID compactionTaskID = SystemKeyspace.startCompaction(
                 Keyspace.open(ks).getColumnFamilyStore(cf),
-                Collections.singleton(SSTableReader.open(sstable1.descriptor)));
+                Collections.singleton(sstable2));
 
         Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
         unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
@@ -1768,6 +1769,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
         assertTrue(unfinished.isEmpty());
+        sstable1.selfRef().release();
+        sstable2.selfRef().release();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index d9442c7..7756abe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -96,7 +96,7 @@ public class AntiCompactionTest extends SchemaLoader
         for (SSTableReader sstable : store.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.sharedRef().globalCount());
+            assertEquals(1, sstable.selfRef().globalCount());
         }
         assertEquals(0, store.getDataTracker().getCompacting().size());
         assertEquals(repairedKeys, 4);
@@ -157,7 +157,7 @@ public class AntiCompactionTest extends SchemaLoader
         CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
-        assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
         assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 
@@ -174,7 +174,7 @@ public class AntiCompactionTest extends SchemaLoader
 
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
         assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index d9bf017..87b284e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -173,9 +173,9 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
         List<SSTableReader> interestingBucket = mostInterestingBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
         assertTrue("nothing should be returned when all buckets are below the min threshold", interestingBucket.isEmpty());
 
-        sstrs.get(0).readMeter = new RestorableMeter(100.0, 100.0);
-        sstrs.get(1).readMeter = new RestorableMeter(200.0, 200.0);
-        sstrs.get(2).readMeter = new RestorableMeter(300.0, 300.0);
+        sstrs.get(0).overrideReadMeter(new RestorableMeter(100.0, 100.0));
+        sstrs.get(1).overrideReadMeter(new RestorableMeter(200.0, 200.0));
+        sstrs.get(2).overrideReadMeter(new RestorableMeter(300.0, 300.0));
 
         long estimatedKeys = sstrs.get(0).estimatedKeys();
 
@@ -215,43 +215,43 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
         List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
 
         for (SSTableReader sstr : sstrs)
-            sstr.readMeter = null;
+            sstr.overrideReadMeter(null);
         filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
 
         for (SSTableReader sstr : sstrs)
-            sstr.readMeter = new RestorableMeter(0.0, 0.0);
+            sstr.overrideReadMeter(new RestorableMeter(0.0, 0.0));
         filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
 
         // leave all read rates at 0 besides one
-        sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
+        sstrs.get(0).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
         filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("there should only be one hot sstable", 1, filtered.size());
-        assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
+        assertEquals(1000.0, filtered.get(0).getReadMeter().twoHourRate(), 0.5);
 
         // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
         // rate 1.0 should be ignored, but not the third
         for (SSTableReader sstr : sstrs)
-            sstr.readMeter = new RestorableMeter(0.0, 0.0);
-        sstrs.get(0).readMeter = new RestorableMeter(97.0, 97.0);
-        sstrs.get(1).readMeter = new RestorableMeter(1.0, 1.0);
-        sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
-        sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
+            sstr.overrideReadMeter(new RestorableMeter(0.0, 0.0));
+        sstrs.get(0).overrideReadMeter(new RestorableMeter(97.0, 97.0));
+        sstrs.get(1).overrideReadMeter(new RestorableMeter(1.0, 1.0));
+        sstrs.get(2).overrideReadMeter(new RestorableMeter(1.0, 1.0));
+        sstrs.get(3).overrideReadMeter(new RestorableMeter(1.0, 1.0));
 
         filtered = filterColdSSTables(sstrs, 0.025, 0);
         assertEquals(2, filtered.size());
-        assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
+        assertEquals(98.0, filtered.get(0).getReadMeter().twoHourRate() + filtered.get(1).getReadMeter().twoHourRate(), 0.5);
 
         // make sure a threshold of 0.0 doesn't result in any sstables being filtered
         for (SSTableReader sstr : sstrs)
-            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+            sstr.overrideReadMeter(new RestorableMeter(1.0, 1.0));
         filtered = filterColdSSTables(sstrs, 0.0, 0);
         assertEquals(sstrs.size(), filtered.size());
 
         // just for fun, set a threshold where all sstables are considered cold
         for (SSTableReader sstr : sstrs)
-            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+            sstr.overrideReadMeter(new RestorableMeter(1.0, 1.0));
         filtered = filterColdSSTables(sstrs, 1.0, 0);
         assertTrue(filtered.isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 0a2b5a6..0bb9d5f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -90,7 +90,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
     private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
     {
         for (SSTableReader sstable : sstables)
-            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size());
         for (SSTableReader sstable : sstables)
@@ -117,7 +117,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
     {
         public int compare(SSTableReader o1, SSTableReader o2)
         {
-            return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate());
+            return Double.compare(o1.getReadMeter().fifteenMinuteRate(), o2.getReadMeter().fifteenMinuteRate());
         }
     };
 
@@ -172,7 +172,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
         for (SSTableReader sstable : sstables)
-            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         for (SSTableReader sstable : sstables)
             assertEquals(cfs.metadata.getMinIndexInterval(), sstable.getEffectiveIndexInterval(), 0.001);
@@ -244,7 +244,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
         for (SSTableReader sstable : sstables)
-            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
         sstables = new ArrayList<>(cfs.getSSTables());
@@ -286,7 +286,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
         for (SSTableReader sstable : sstables)
-            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
 
@@ -325,8 +325,8 @@ public class IndexSummaryManagerTest extends SchemaLoader
 
         // make two of the four sstables cold, only leave enough space for three full index summaries,
         // so the two cold sstables should get downsampled to be half of their original size
-        sstables.get(0).readMeter = new RestorableMeter(50.0, 50.0);
-        sstables.get(1).readMeter = new RestorableMeter(50.0, 50.0);
+        sstables.get(0).overrideReadMeter(new RestorableMeter(50.0, 50.0));
+        sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
         sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -338,8 +338,8 @@ public class IndexSummaryManagerTest extends SchemaLoader
         // small increases or decreases in the read rate don't result in downsampling or upsampling
         double lowerRate = 50.0 * (DOWNSAMPLE_THESHOLD + (DOWNSAMPLE_THESHOLD * 0.10));
         double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10));
-        sstables.get(0).readMeter = new RestorableMeter(lowerRate, lowerRate);
-        sstables.get(1).readMeter = new RestorableMeter(higherRate, higherRate);
+        sstables.get(0).overrideReadMeter(new RestorableMeter(lowerRate, lowerRate));
+        sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate));
         sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -350,10 +350,10 @@ public class IndexSummaryManagerTest extends SchemaLoader
 
         // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled
         sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
-        sstables.get(0).readMeter = new RestorableMeter(1.0, 1.0);
-        sstables.get(1).readMeter = new RestorableMeter(2.0, 2.0);
-        sstables.get(2).readMeter = new RestorableMeter(1000.0, 1000.0);
-        sstables.get(3).readMeter = new RestorableMeter(1000.0, 1000.0);
+        sstables.get(0).overrideReadMeter(new RestorableMeter(1.0, 1.0));
+        sstables.get(1).overrideReadMeter(new RestorableMeter(2.0, 2.0));
+        sstables.get(2).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
+        sstables.get(3).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
 
         sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50);
         Collections.sort(sstables, hotnessComparator);
@@ -372,10 +372,10 @@ public class IndexSummaryManagerTest extends SchemaLoader
         // coldest sstables will get downsampled to 4/128 of their size, leaving us with 1 and 92/128th index
         // summaries worth of space.  The hottest sstable should get a full index summary, and the one in the middle
         // should get the remainder.
-        sstables.get(0).readMeter = new RestorableMeter(0.0, 0.0);
-        sstables.get(1).readMeter = new RestorableMeter(0.0, 0.0);
-        sstables.get(2).readMeter = new RestorableMeter(92, 92);
-        sstables.get(3).readMeter = new RestorableMeter(128.0, 128.0);
+        sstables.get(0).overrideReadMeter(new RestorableMeter(0.0, 0.0));
+        sstables.get(1).overrideReadMeter(new RestorableMeter(0.0, 0.0));
+        sstables.get(2).overrideReadMeter(new RestorableMeter(92, 92));
+        sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
         sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
         Collections.sort(sstables, hotnessComparator);
         assertEquals(1, sstables.get(0).getIndexSummarySize());  // at the min sampling level
@@ -421,10 +421,13 @@ public class IndexSummaryManagerTest extends SchemaLoader
         SSTableReader sstable = original;
         for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
         {
+            SSTableReader prev = sstable;
             sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
             assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
             int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
             assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+            if (prev != original)
+                prev.selfRef().release();
         }
 
         // don't leave replaced SSTRs around to break other tests

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 8bef669..19ba274 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
-        details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.tryRef(),
+        details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.ref(),
                                                                sstable.getPositionsForRanges(ranges),
                                                                sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
@@ -138,6 +138,7 @@ public class LegacySSTableTest extends SchemaLoader
             assert cf.deletionInfo().equals(DeletionInfo.live());
             assert iter.next().name().toByteBuffer().equals(key);
         }
+        sstable.selfRef().release();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 51588f2..9607673 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -203,15 +203,15 @@ public class SSTableReaderTest extends SchemaLoader
         store.forceBlockingFlush();
 
         SSTableReader sstable = store.getSSTables().iterator().next();
-        assertEquals(0, sstable.readMeter.count());
+        assertEquals(0, sstable.getReadMeter().count());
 
         DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4"));
         store.getColumnFamily(key, Composites.EMPTY, Composites.EMPTY, false, 100, 100);
-        assertEquals(1, sstable.readMeter.count());
+        assertEquals(1, sstable.getReadMeter().count());
         store.getColumnFamily(key, cellname("0"), cellname("0"), false, 100, 100);
-        assertEquals(2, sstable.readMeter.count());
+        assertEquals(2, sstable.getReadMeter().count());
         store.getColumnFamily(Util.namesQueryFilter(store, key, cellname("0")));
-        assertEquals(3, sstable.readMeter.count());
+        assertEquals(3, sstable.getReadMeter().count());
     }
 
     @Test
@@ -334,6 +334,7 @@ public class SSTableReaderTest extends SchemaLoader
         Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0));
         assert target.first.equals(firstKey);
         assert target.last.equals(lastKey);
+        target.selfRef().release();
     }
 
     @Test
@@ -360,6 +361,7 @@ public class SSTableReaderTest extends SchemaLoader
 
         SSTableReader reopened = SSTableReader.open(sstable.descriptor);
         assert reopened.first.getToken() instanceof LocalToken;
+        reopened.selfRef().release();
     }
 
     /** see CASSANDRA-5407 */
@@ -417,6 +419,7 @@ public class SSTableReaderTest extends SchemaLoader
         SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner);
         sections = bulkLoaded.getPositionsForRanges(ranges);
         assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
+        bulkLoaded.selfRef().release();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index acf8c90..2e11624 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -19,11 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -200,10 +196,11 @@ public class SSTableRewriterTest extends SchemaLoader
         assertTrue(s != s2);
         assertFileCounts(dir.list(), 2, 3);
         s.markObsolete();
-        s.sharedRef().release();
+        s.selfRef().release();
+        s2.selfRef().release();
         Thread.sleep(1000);
         assertFileCounts(dir.list(), 0, 3);
-        writer.abort(false);
+        writer.abort();
         Thread.sleep(1000);
         int datafiles = assertFileCounts(dir.list(), 0, 0);
         assertEquals(datafiles, 0);
@@ -705,7 +702,7 @@ public class SSTableRewriterTest extends SchemaLoader
         for (SSTableReader sstable : cfs.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.sharedRef().globalCount());
+            assertEquals(1, sstable.selfRef().globalCount());
             liveDescriptors.add(sstable.descriptor.generation);
         }
         for (File dir : cfs.directories.getCFDirectories())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index f84ae11..1447b29 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferFile(sstable, sstable.sharedRef(), 1, sstable.getPositionsForRanges(ranges), 0);
+            task.addTransferFile(sstable, sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0);
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index c3f3419..c918d6a 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -189,6 +189,7 @@ public class SSTableExportTest extends SchemaLoader
         qf = Util.namesQueryFilter(cfs, Util.dk("rowExclude"), "name");
         cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         assert cf == null;
+        reader.selfRef().release();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
index b0f4c0a..a5f05f8 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
@@ -71,6 +71,7 @@ public class SSTableImportTest extends SchemaLoader
         assert expCol.value().equals(hexToBytes("76616c4143"));
         assert expCol instanceof ExpiringCell;
         assert ((ExpiringCell)expCol).getTimeToLive() == 42 && expCol.getLocalDeletionTime() == 2000000000;
+        reader.selfRef().release();
     }
 
     private ColumnFamily cloneForAdditions(OnDiskAtomIterator iter)
@@ -105,6 +106,7 @@ public class SSTableImportTest extends SchemaLoader
         assert expCol.value().equals(hexToBytes("76616c4143"));
         assert expCol instanceof ExpiringCell;
         assert ((ExpiringCell) expCol).getTimeToLive() == 42 && expCol.getLocalDeletionTime() == 2000000000;
+        reader.selfRef().release();
     }
 
     @Test
@@ -129,6 +131,7 @@ public class SSTableImportTest extends SchemaLoader
         assert expCol.value().equals(hexToBytes("76616c4143"));
         assert expCol instanceof ExpiringCell;
         assert ((ExpiringCell) expCol).getTimeToLive() == 42 && expCol.getLocalDeletionTime() == 2000000000;
+        reader.selfRef().release();
     }
 
     @Test
@@ -148,6 +151,7 @@ public class SSTableImportTest extends SchemaLoader
         Cell c = cf.getColumn(Util.cellname("colAA"));
         assert c instanceof CounterCell : c;
         assert ((CounterCell) c).total() == 42;
+        reader.selfRef().release();
     }
 
     @Test
@@ -168,6 +172,7 @@ public class SSTableImportTest extends SchemaLoader
         QueryFilter qf2 = QueryFilter.getIdentityFilter(Util.dk("726f7741", BytesType.instance), "AsciiKeys", System.currentTimeMillis());
         OnDiskAtomIterator iter2 = qf2.getSSTableColumnIterator(reader);
         assert !iter2.hasNext(); // "bytes" key does not exist
+        reader.selfRef().release();
     }
 
     @Test
@@ -186,6 +191,7 @@ public class SSTableImportTest extends SchemaLoader
         QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "AsciiKeys", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         assert iter.hasNext(); // "bytes" key exists
+        reader.selfRef().release();
     }
     
     @Test
@@ -207,6 +213,7 @@ public class SSTableImportTest extends SchemaLoader
         assertThat(result.size(), is(2));
         assertThat(result, hasItem(withElements(1, "NY", 1980)));
         assertThat(result, hasItem(withElements(2, "CA", 2014)));
+        reader.selfRef().release();
     }
 
     @Test(expected=AssertionError.class)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index 4180a8c..aee0880 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -30,10 +30,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
@@ -45,7 +42,7 @@ public class BloomFilterTest
 
     public BloomFilterTest()
     {
-        bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE, true);
+
     }
 
     public static IFilter testSerialize(IFilter f) throws IOException
@@ -64,9 +61,15 @@ public class BloomFilterTest
 
 
     @Before
-    public void clear()
+    public void setup()
+    {
+        bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE, true);
+    }
+
+    @After
+    public void destroy()
     {
-        bf.clear();
+        bf.close();
     }
 
     @Test(expected = UnsupportedOperationException.class)
@@ -114,12 +117,13 @@ public class BloomFilterTest
         FilterTestHelper.testFalsePositives(bf2,
                                             new KeyGenerator.WordGenerator(skipEven, 2),
                                             new KeyGenerator.WordGenerator(1, 2));
+        bf2.close();
     }
 
     @Test
     public void testSerialize() throws IOException
     {
-        BloomFilterTest.testSerialize(bf);
+        BloomFilterTest.testSerialize(bf).close();
     }
 
     public void testManyHashes(Iterator<ByteBuffer> keys)
@@ -137,6 +141,7 @@ public class BloomFilterTest
                 hashes.add(hashIndex);
             }
             collisions += (MAX_HASH_COUNT - hashes.size());
+            bf.close();
         }
         assert collisions <= 100;
     }
@@ -167,6 +172,7 @@ public class BloomFilterTest
         FilterFactory.serialize(filter, out);
         filter.bitset.serialize(out);
         out.close();
+        filter.close();
         
         DataInputStream in = new DataInputStream(new FileInputStream(file));
         BloomFilter filter2 = (BloomFilter) FilterFactory.deserialize(in, true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index 976a3eb..b0a23fd 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -39,6 +39,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         DataOutputStreamAndChannel out = getOutput("utils.BloomFilter.bin");
         FilterFactory.serialize(bf, out);
         out.close();
+        bf.close();
     }
 
     @Test
@@ -48,7 +49,9 @@ public class SerializationsTest extends AbstractSerializationsTester
             testBloomFilterWrite(true);
 
         DataInputStream in = getInput("utils.BloomFilter.bin");
-        assert FilterFactory.deserialize(in, true) != null;
+        IFilter bf = FilterFactory.deserialize(in, true);
+        assert bf != null;
+        bf.close();
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61384c57/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
index fe22d21..a9247cd 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java
@@ -44,9 +44,9 @@ public class RefCountedTest
     public void testLeak() throws InterruptedException
     {
         Tidier tidier = new Tidier();
-        RefCounted obj = RefCounted.Impl.get(tidier);
+        Ref<?> obj = new Ref(null, tidier);
         obj.tryRef();
-        obj.sharedRef().release();
+        obj.release();
         System.gc();
         System.gc();
         Thread.sleep(1000);
@@ -57,7 +57,7 @@ public class RefCountedTest
     public void testSeriousLeak() throws InterruptedException
     {
         Tidier tidier = new Tidier();
-        RefCounted.Impl.get(tidier);
+        new Ref(null, tidier);
         System.gc();
         System.gc();
         System.gc();
@@ -73,9 +73,9 @@ public class RefCountedTest
         try
         {
             tidier = new Tidier();
-            RefCounted obj = RefCounted.Impl.get(tidier);
-            obj.sharedRef().release();
-            obj.sharedRef().release();
+            Ref<?> obj = new Ref(null, tidier);
+            obj.release();
+            obj.release();
             Assert.assertTrue(false);
         }
         catch (Exception e)


Mime
View raw message