Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B2AFB101E9 for ; Thu, 30 Jan 2014 23:09:33 +0000 (UTC) Received: (qmail 87693 invoked by uid 500); 30 Jan 2014 23:09:29 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 87616 invoked by uid 500); 30 Jan 2014 23:09:28 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 87592 invoked by uid 99); 30 Jan 2014 23:09:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jan 2014 23:09:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4285D91694A; Thu, 30 Jan 2014 23:09:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Thu, 30 Jan 2014 23:09:27 -0000 Message-Id: <758b9963116e48339675e0c8a241fcf1@git.apache.org> In-Reply-To: <2bd30dd912ec497a91213ad0f02492a2@git.apache.org> References: <2bd30dd912ec497a91213ad0f02492a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/6] remove Table.switchlock and introduce o.a.c.utils.memory package patch by Benedict Elliott Smith; reviewed by jbellis for CASSANDRA-5549 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java b/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java deleted file mode 100644 index a193c31..0000000 --- a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.cassandra.utils.btree; - -import com.google.common.base.Function; - -/** - * An interface defining a function to be applied to both the object we are replacing in a BTree and - * the object that is intended to replace it, returning the object to actually replace it. - * - * If this is a new insertion, that is there is no object to replace, the one argument variant of - * the function will be called. - * - * @param - */ -public interface ReplaceFunction extends Function -{ - V apply(V replaced, V update); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java new file mode 100644 index 0000000..cd30492 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java @@ -0,0 +1,30 @@ +package org.apache.cassandra.utils.btree; + +import com.google.common.base.Function; + +/** + * An interface defining a function to be applied to both the object we are replacing in a BTree and + * the object that is intended to replace it, returning the object to actually replace it. + * + * @param + */ +public interface UpdateFunction extends Function +{ + /** + * @param replacing the value in the original tree we have matched + * @param update the value in the updating collection that matched + * @return the value to insert into the new tree + */ + V apply(V replacing, V update); + + /** + * @return true if we should fail the update + */ + boolean abortEarly(); + + /** + * @param heapSize extra heap space allocated (over previous tree) + */ + void allocated(long heapSize); + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java new file mode 100644 index 0000000..44330a0 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java @@ -0,0 +1,411 @@ +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + *

A class for providing synchronization between producers and consumers that do not + * communicate directly with each other, but where the consumers need to process their + * work in contiguous batches. In particular this is useful for both CommitLog and Memtable + * where the producers (writing threads) are modifying a structure that the consumer + * (flush executor) only batch syncs, but needs to know what 'position' the work is at + * for co-ordination with other processes, + * + *

The typical usage is something like: + *

+     public final class ExampleShared
+     {
+        final OpOrder order = new OpOrder();
+        volatile SharedState state;
+
+        static class SharedState
+        {
+            volatile Barrier barrier;
+
+            // ...
+        }
+
+        public void consume()
+        {
+            SharedState state = this.state;
+            state.setReplacement(new State())
+            state.doSomethingToPrepareForBarrier();
+
+            state.barrier = order.newBarrier();
+            // seal() MUST be called after newBarrier() else barrier.isAfter()
+            // will always return true, and barrier.await() will fail
+            state.barrier.issue();
+
+            // wait for all producer work started prior to the barrier to complete
+            state.barrier.await();
+
+            // change the shared state to its replacement, as the current state will no longer be used by producers
+            this.state = state.getReplacement();
+
+            state.doSomethingWithExclusiveAccess();
+        }
+
+        public void produce()
+        {
+            Group opGroup = order.start();
+            try
+            {
+                SharedState s = state;
+                while (s.barrier != null && !s.barrier.isAfter(opGroup))
+                    s = s.getReplacement();
+                s.doProduceWork();
+            }
+            finally
+            {
+                opGroup.finishOne();
+            }
+        }
+    }
+ * 
+ */ +public class OpOrder +{ + /** + * Constant that when an Ordered.running is equal to, indicates the Ordered is complete + */ + private static final int FINISHED = -1; + + /** + * A linked list starting with the most recent Ordered object, i.e. the one we should start new operations from, + * with (prev) links to any incomplete Ordered instances, and (next) links to any potential future Ordered instances. + * Once all operations started against an Ordered instance and its ancestors have been finished the next instance + * will unlink this one + */ + private volatile Group current = new Group(); + + /** + * Start an operation against this OpOrder. + * Once the operation is completed Ordered.finishOne() MUST be called EXACTLY once for this operation. + * + * @return the Ordered instance that manages this OpOrder + */ + public Group start() + { + while (true) + { + Group current = this.current; + if (current.register()) + return current; + } + } + + /** + * Creates a new barrier. The barrier is only a placeholder until barrier.issue() is called on it, + * after which all new operations will start against a new Group that will not be accepted + * by barrier.isAfter(), and barrier.await() will return only once all operations started prior to the issue + * have completed. + * + * @return + */ + public Barrier newBarrier() + { + return new Barrier(); + } + + public Group getCurrent() + { + return current; + } + + /** + * Represents a group of identically ordered operations, i.e. all operations started in the interval between + * two barrier issuances. For each register() call this is returned, finishOne() must be called exactly once. + * It should be treated like taking a lock(). + */ + public static final class Group implements Comparable + { + /** + * In general this class goes through the following stages: + * 1) LIVE: many calls to register() and finishOne() + * 2) FINISHING: a call to expire() (after a barrier issue), means calls to register() will now fail, + * and we are now 'in the past' (new operations will be started against a new Ordered) + * 3) FINISHED: once the last finishOne() is called, this Ordered is done. We call unlink(). + * 4) ZOMBIE: all our operations are finished, but some operations against an earlier Ordered are still + * running, or tidying up, so unlink() fails to remove us + * 5) COMPLETE: all operations started on or before us are FINISHED (and COMPLETE), so we are unlinked + *

+ * Another parallel states is ISBLOCKING: + *

+ * isBlocking => a barrier that is waiting on us (either directly, or via a future Ordered) is blocking general + * progress. This state is entered by calling Barrier.markBlocking(). If the running operations are blocked + * on a Signal that is also registered with the isBlockingSignal (probably through isSafeBlockingSignal) + * then they will be notified that they are blocking forward progress, and may take action to avoid that. + */ + + private volatile Group prev, next; + private final long id; // monotonically increasing id for compareTo() + private volatile int running = 0; // number of operations currently running. < 0 means we're expired, and the count of tasks still running is -(running + 1) + private volatile boolean isBlocking; // indicates running operations are blocking future barriers + private final WaitQueue isBlockingSignal = new WaitQueue(); // signal to wait on to indicate isBlocking is true + private final WaitQueue waiting = new WaitQueue(); // signal to wait on for completion + + static final AtomicIntegerFieldUpdater runningUpdater = AtomicIntegerFieldUpdater.newUpdater(Group.class, "running"); + + // constructs first instance only + private Group() + { + this.id = 0; + } + + private Group(Group prev) + { + this.id = prev.id + 1; + this.prev = prev; + } + + // prevents any further operations starting against this Ordered instance + // if there are no running operations, calls unlink; otherwise, we let the last op to finishOne call it. + // this means issue() won't have to block for ops to finish. + private void expire() + { + while (true) + { + int current = running; + if (current < 0) + throw new IllegalStateException(); + if (runningUpdater.compareAndSet(this, current, -1 - current)) + { + // if we're already finished (no running ops), unlink ourselves + if (current == 0) + unlink(); + return; + } + } + } + + // attempts to start an operation against this Ordered instance, and returns true if successful. + private boolean register() + { + while (true) + { + int current = running; + if (current < 0) + return false; + if (runningUpdater.compareAndSet(this, current, current + 1)) + return true; + } + } + + /** + * To be called exactly once for each register() call this object is returned for, indicating the operation + * is complete + */ + public void finishOne() + { + while (true) + { + int current = running; + if (current < 0) + { + if (runningUpdater.compareAndSet(this, current, current + 1)) + { + if (current + 1 == FINISHED) + { + // if we're now finished, unlink ourselves + unlink(); + } + return; + } + } + else if (runningUpdater.compareAndSet(this, current, current - 1)) + { + return; + } + } + } + + /** + * called once we know all operations started against this Ordered have completed, + * however we do not know if operations against its ancestors have completed, or + * if its descendants have completed ahead of it, so we attempt to create the longest + * chain from the oldest still linked Ordered. If we can't reach the oldest through + * an unbroken chain of completed Ordered, we abort, and leave the still completing + * ancestor to tidy up. + */ + private void unlink() + { + // walk back in time to find the start of the list + Group start = this; + while (true) + { + Group prev = start.prev; + if (prev == null) + break; + // if we haven't finished this Ordered yet abort and let it clean up when it's done + if (prev.running != FINISHED) + return; + start = prev; + } + + // now walk forwards in time, in case we finished up late + Group end = this.next; + while (end.running == FINISHED) + end = end.next; + + // now walk from first to last, unlinking the prev pointer and waking up any blocking threads + while (start != end) + { + Group next = start.next; + next.prev = null; + start.waiting.signalAll(); + start = next; + } + } + + /** + * @return true if a barrier we are behind is, or may be, blocking general progress, + * so we should try more aggressively to progress + */ + public boolean isBlocking() + { + return isBlocking; + } + + /** + * register to be signalled when a barrier waiting on us is, or maybe, blocking general progress, + * so we should try more aggressively to progress + */ + public WaitQueue.Signal isBlockingSignal() + { + return isBlockingSignal.register(); + } + + /** + * wrap the provided signal to also be signalled if the operation gets marked blocking + */ + public WaitQueue.Signal isBlockingSignal(WaitQueue.Signal signal) + { + return WaitQueue.any(signal, isBlockingSignal()); + } + + public int compareTo(Group that) + { + // we deliberately use subtraction, as opposed to Long.compareTo() as we care about ordering + // not which is the smaller value, so this permits wrapping in the unlikely event we exhaust the long space + long c = this.id - that.id; + if (c > 0) + return 1; + else if (c < 0) + return -1; + else + return 0; + } + } + + /** + * This class represents a synchronisation point providing ordering guarantees on operations started + * against the enclosing OpOrder. When issue() is called upon it (may only happen once per Barrier), the + * Barrier atomically partitions new operations from those already running (by expiring the current Group), + * and activates its isAfter() method + * which indicates if an operation was started before or after this partition. It offers methods to + * determine, or block until, all prior operations have finished, and a means to indicate to those operations + * that they are blocking forward progress. See {@link OpOrder} for idiomatic usage. + */ + public final class Barrier + { + // this Barrier was issued after all Group operations started against orderOnOrBefore + private volatile Group orderOnOrBefore; + + /** + * @return true if @param group was started prior to the issuing of the barrier. + * + * (Until issue is called, always returns true, but if you rely on this behavior you are probably + * Doing It Wrong.) + */ + public boolean isAfter(Group group) + { + if (orderOnOrBefore == null) + return true; + // we subtract to permit wrapping round the full range of Long - so we only need to ensure + // there are never Long.MAX_VALUE * 2 total Group objects in existence at any one timem which will + // take care of itself + return orderOnOrBefore.id - group.id >= 0; + } + + /** + * Issues (seals) the barrier, meaning no new operations may be issued against it, and expires the current + * Group. Must be called before await() for isAfter() to be properly synchronised. + */ + public void issue() + { + if (orderOnOrBefore != null) + throw new IllegalStateException("Can only call issue() once on each Barrier"); + + final Group current; + synchronized (OpOrder.this) + { + current = OpOrder.this.current; + orderOnOrBefore = current; + OpOrder.this.current = current.next = new Group(current); + } + current.expire(); + } + + /** + * Mark all prior operations as blocking, potentially signalling them to more aggressively make progress + */ + public void markBlocking() + { + Group current = orderOnOrBefore; + while (current != null) + { + current.isBlocking = true; + current.isBlockingSignal.signalAll(); + current = current.prev; + } + } + + /** + * Register to be signalled once allPriorOpsAreFinished() or allPriorOpsAreFinishedOrSafe() may return true + */ + public WaitQueue.Signal register() + { + return orderOnOrBefore.waiting.register(); + } + + /** + * @return true if all operations started prior to barrier.issue() have completed + */ + public boolean allPriorOpsAreFinished() + { + Group current = orderOnOrBefore; + if (current == null) + throw new IllegalStateException("This barrier needs to have issue() called on it before prior operations can complete"); + if (current.next.prev == null) + return true; + return false; + } + + /** + * wait for all operations started prior to issuing the barrier to complete + */ + public void await() + { + while (!allPriorOpsAreFinished()) + { + WaitQueue.Signal signal = register(); + if (allPriorOpsAreFinished()) + { + signal.cancel(); + return; + } + else + signal.awaitUninterruptibly(); + } + assert orderOnOrBefore.running == FINISHED; + } + + /** + * returns the Group we are waiting on - any Group with .compareTo(getSyncPoint()) <= 0 + * must complete before await() returns + */ + public Group getSyncPoint() + { + return orderOnOrBefore; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java new file mode 100644 index 0000000..9bef8c3 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java @@ -0,0 +1,508 @@ +package org.apache.cassandra.utils.concurrent; + +import com.yammer.metrics.core.TimerContext; +import org.slf4j.*; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; + +/** + *

A relatively easy to use utility for general purpose thread signalling.

+ *

Usage on a thread awaiting a state change using a WaitQueue q is:

+ *
+ * {@code
+ *      while (!conditionMet())
+ *          Signal s = q.register();
+ *              if (!conditionMet())    // or, perhaps more correctly, !conditionChanged()
+ *                  s.await();
+ *              else
+ *                  s.cancel();
+ * }
+ * 
+ * A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll() + * to wake up all, waiting threads. + *

To understand intuitively how this class works, the idea is simply that a thread, once it considers itself + * incapable of making progress, registers to be awoken once that changes. Since this could have changed between + * checking and registering (in which case the thread that made this change would have been unable to signal it), + * it checks the condition again, sleeping only if it hasn't changed/still is not met.

+ *

This thread synchronisation scheme has some advantages over Condition objects and Object.wait/notify in that no monitor + * acquisition is necessary and, in fact, besides the actual waiting on a signal, all operations are non-blocking. + * As a result consumers can never block producers, nor each other, or vice versa, from making progress. + * Threads that are signalled are also put into a RUNNABLE state almost simultaneously, so they can all immediately make + * progress without having to serially acquire the monitor/lock, reducing scheduler delay incurred.

+ * + *

A few notes on utilisation:

+ *

1. A thread will only exit await() when it has been signalled, but this does not guarantee the condition has not + * been altered since it was signalled, and depending on your design it is likely the outer condition will need to be + * checked in a loop, though this is not always the case.

+ *

2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.

+ *

3. If you choose not to wait on the signal (because the condition has been met before you waited on it) + * you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be + * lost. If signalAll() is used but infrequent, and register() is frequent, cancel() should still be used to prevent the + * queue growing unboundedly. Similarly, if you provide a TimerContext, cancel should be used to ensure it is not erroneously + * counted towards wait time.

+ *

4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually + * indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative + * of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition + * to be met that we no longer need. + *

5. This scheme is not fair

+ *

6. Only the thread that calls register() may call await()

+ */ +public final class WaitQueue +{ + + private static final Logger logger = LoggerFactory.getLogger(WaitQueue.class); + + private static final int CANCELLED = -1; + private static final int SIGNALLED = 1; + private static final int NOT_SET = 0; + + private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state"); + + // the waiting signals + private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + + /** + * The calling thread MUST be the thread that uses the signal + * @return + */ + public Signal register() + { + RegisteredSignal signal = new RegisteredSignal(); + queue.add(signal); + return signal; + } + + /** + * The calling thread MUST be the thread that uses the signal. + * If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled, + * or the waiting thread is interrupted. + * @return + */ + public Signal register(TimerContext context) + { + assert context != null; + RegisteredSignal signal = new TimedSignal(context); + queue.add(signal); + return signal; + } + + /** + * Signal one waiting thread + */ + public boolean signal() + { + if (!hasWaiters()) + return false; + while (true) + { + RegisteredSignal s = queue.poll(); + if (s == null || s.signal()) + return s != null; + } + } + + /** + * Signal all waiting threads + */ + public void signalAll() + { + if (!hasWaiters()) + return; + List woke = null; + if (logger.isTraceEnabled()) + woke = new ArrayList<>(); + long start = System.nanoTime(); + // we wake up only a snapshot of the queue, to avoid a race where the condition is not met and the woken thread + // immediately waits on the queue again + RegisteredSignal last = queue.getLast(); + Iterator iter = queue.iterator(); + while (iter.hasNext()) + { + RegisteredSignal signal = iter.next(); + if (logger.isTraceEnabled()) + { + Thread thread = signal.thread; + if (signal.signal()) + woke.add(thread); + } + else + signal.signal(); + + iter.remove(); + + if (signal == last) + break; + } + long end = System.nanoTime(); + if (woke != null) + logger.trace("Woke up {} in {}ms from {}", woke, (end - start) * 0.000001d, Thread.currentThread().getStackTrace()[2]); + } + + private void cleanUpCancelled() + { + // attempt to remove the cancelled from the beginning only, but if we fail to remove any proceed to cover + // the whole list + Iterator iter = queue.iterator(); + while (iter.hasNext()) + { + RegisteredSignal s = iter.next(); + if (s.isCancelled()) + iter.remove(); + } + } + + public boolean hasWaiters() + { + return !queue.isEmpty(); + } + + /** + * Return how many threads are waiting + * @return + */ + public int getWaiting() + { + if (queue.isEmpty()) + return 0; + Iterator iter = queue.iterator(); + int count = 0; + while (iter.hasNext()) + { + Signal next = iter.next(); + if (!next.isCancelled()) + count++; + } + return count; + } + + /** + * A Signal is a one-time-use mechanism for a thread to wait for notification that some condition + * state has transitioned that it may be interested in (and hence should check if it is). + * It is potentially transient, i.e. the state can change in the meantime, it only indicates + * that it should be checked, not necessarily anything about what the expected state should be. + * + * Signal implementations should never wake up spuriously, they are always woken up by a + * signal() or signalAll(). + * + * This abstract definition of Signal does not need to be tied to a WaitQueue. + * Whilst RegisteredSignal is the main building block of Signals, this abstract + * definition allows us to compose Signals in useful ways. The Signal is 'owned' by the + * thread that registered itself with WaitQueue(s) to obtain the underlying RegisteredSignal(s); + * only the owning thread should use a Signal. + */ + public static interface Signal + { + + /** + * @return true if signalled; once true, must be discarded by the owning thread. + */ + public boolean isSignalled(); + + /** + * @return true if cancelled; once cancelled, must be discarded by the owning thread. + */ + public boolean isCancelled(); + + /** + * @return isSignalled() || isCancelled(). Once true, the state is fixed and the Signal should be discarded + * by the owning thread. + */ + public boolean isSet(); + + /** + * atomically: cancels the Signal if !isSet(), or returns true if isSignalled() + * + * @return true if isSignalled() + */ + public boolean checkAndClear(); + + /** + * Should only be called by the owning thread. Indicates the signal can be retired, + * and if signalled propagates the signal to another waiting thread + */ + public abstract void cancel(); + + /** + * Wait, without throwing InterruptedException, until signalled. On exit isSignalled() must be true. + * If the thread is interrupted in the meantime, the interrupted flag will be set. + */ + public void awaitUninterruptibly(); + + /** + * Wait until signalled, or throw an InterruptedException if interrupted before this happens. + * On normal exit isSignalled() must be true; however if InterruptedException is thrown isCancelled() + * will be true. + * @throws InterruptedException + */ + public void await() throws InterruptedException; + + /** + * Wait until signalled, or the provided time is reached, or the thread is interrupted. If signalled, + * isSignalled() will be true on exit, and the method will return true; if timedout, the method will return + * false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled() + * will be true. + * @param until System.currentTimeMillis() to wait until + * @return true if signalled, false if timed out + * @throws InterruptedException + */ + public boolean awaitUntil(long until) throws InterruptedException; + } + + /** + * An abstract signal implementation + */ + public static abstract class AbstractSignal implements Signal + { + public void awaitUninterruptibly() + { + boolean interrupted = false; + while (!isSignalled()) + { + if (Thread.currentThread().interrupted()) + interrupted = true; + LockSupport.park(); + } + if (interrupted) + Thread.currentThread().interrupt(); + checkAndClear(); + } + + public void await() throws InterruptedException + { + while (!isSignalled()) + { + checkInterrupted(); + LockSupport.park(); + } + checkAndClear(); + } + + public boolean awaitUntil(long until) throws InterruptedException + { + while (until < System.currentTimeMillis() && !isSignalled()) + { + checkInterrupted(); + LockSupport.parkUntil(until); + } + return checkAndClear(); + } + + private void checkInterrupted() throws InterruptedException + { + if (Thread.interrupted()) + { + cancel(); + throw new InterruptedException(); + } + } + } + + /** + * A signal registered with this WaitQueue + */ + private class RegisteredSignal extends AbstractSignal + { + private volatile Thread thread = Thread.currentThread(); + volatile int state; + + public boolean isSignalled() + { + return state == SIGNALLED; + } + + public boolean isCancelled() + { + return state == CANCELLED; + } + + public boolean isSet() + { + return state != NOT_SET; + } + + private boolean signal() + { + if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED)) + { + LockSupport.unpark(thread); + thread = null; + return true; + } + return false; + } + + public boolean checkAndClear() + { + if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED)) + { + thread = null; + cleanUpCancelled(); + return false; + } + // must now be signalled assuming correct API usage + return true; + } + + /** + * Should only be called by the registered thread. Indicates the signal can be retired, + * and if signalled propagates the signal to another waiting thread + */ + public void cancel() + { + if (isCancelled()) + return; + if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED)) + { + // must already be signalled - switch to cancelled and + state = CANCELLED; + // propagate the signal + WaitQueue.this.signal(); + } + thread = null; + cleanUpCancelled(); + } + } + + /** + * A RegisteredSignal that stores a TimerContext, and stops the timer when either cancelled or + * finished waiting. i.e. if the timer is started when the signal is registered it tracks the + * time in between registering and invalidating the signal. + */ + private final class TimedSignal extends RegisteredSignal + { + private final TimerContext context; + + private TimedSignal(TimerContext context) + { + this.context = context; + } + + @Override + public boolean checkAndClear() + { + context.stop(); + return super.checkAndClear(); + } + + @Override + public void cancel() + { + if (!isCancelled()) + { + context.stop(); + super.cancel(); + } + } + } + + /** + * An abstract signal wrapping multiple delegate signals + */ + private abstract static class MultiSignal extends AbstractSignal + { + final Signal[] signals; + protected MultiSignal(Signal[] signals) + { + this.signals = signals; + } + + public boolean isCancelled() + { + for (Signal signal : signals) + if (!signal.isCancelled()) + return false; + return true; + } + + public boolean checkAndClear() + { + for (Signal signal : signals) + signal.checkAndClear(); + return isSignalled(); + } + + public void cancel() + { + for (Signal signal : signals) + signal.cancel(); + } + } + + /** + * A Signal that wraps multiple Signals and returns when any single one of them would have returned + */ + private static class AnySignal extends MultiSignal + { + protected AnySignal(Signal ... signals) + { + super(signals); + } + + public boolean isSignalled() + { + for (Signal signal : signals) + if (signal.isSignalled()) + return true; + return false; + } + + public boolean isSet() + { + for (Signal signal : signals) + if (signal.isSet()) + return true; + return false; + } + } + + /** + * A Signal that wraps multiple Signals and returns when all of them would have finished returning + */ + private static class AllSignal extends MultiSignal + { + protected AllSignal(Signal ... signals) + { + super(signals); + } + + public boolean isSignalled() + { + for (Signal signal : signals) + if (!signal.isSignalled()) + return false; + return true; + } + + public boolean isSet() + { + for (Signal signal : signals) + if (!signal.isSet()) + return false; + return true; + } + } + + /** + * @param signals + * @return a signal that returns only when any of the provided signals would have returned + */ + public static Signal any(Signal ... signals) + { + return new AnySignal(signals); + } + + /** + * @param signals + * @return a signal that returns only when all provided signals would have returned + */ + public static Signal all(Signal ... signals) + { + return new AllSignal(signals); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java new file mode 100644 index 0000000..8ebdf30 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.ByteBufferUtil; + +import java.nio.ByteBuffer; + +public abstract class AbstractAllocator +{ + /** + * Allocate a slice of the given length. + */ + public ByteBuffer clone(ByteBuffer buffer) + { + assert buffer != null; + if (buffer.remaining() == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + ByteBuffer cloned = allocate(buffer.remaining()); + + cloned.mark(); + cloned.put(buffer.duplicate()); + cloned.reset(); + return cloned; + } + + public abstract ByteBuffer allocate(int size); + + // + // only really applicable to Pooled subclasses, but we provide default implementations here + // + + public long owns() + { + return 0; + } + + public float ownershipRatio() + { + return 0; + } + + public long reclaiming() + { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java new file mode 100644 index 0000000..c58340e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java @@ -0,0 +1,59 @@ +package org.apache.cassandra.utils.memory; + +import com.google.common.base.*; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.db.ColumnFamilyStore; + +import java.nio.ByteBuffer; + +/** + * Wraps calls to a PoolAllocator with the provided writeOp. Also doubles as a Function that clones Cells + * using itself + */ +public final class ContextAllocator extends AbstractAllocator implements Function +{ + private final OpOrder.Group opGroup; + private final PoolAllocator allocator; + private final ColumnFamilyStore cfs; + + public ContextAllocator(OpOrder.Group opGroup, PoolAllocator allocator, ColumnFamilyStore cfs) + { + this.opGroup = opGroup; + this.allocator = allocator; + this.cfs = cfs; + } + + @Override + public ByteBuffer clone(ByteBuffer buffer) + { + return allocator.clone(buffer, opGroup); + } + + public ByteBuffer allocate(int size) + { + return allocator.allocate(size, opGroup); + } + + public Cell apply(Cell column) + { + return column.localCopy(cfs, this); + } + + public long owns() + { + return allocator.owns(); + } + + @Override + public float ownershipRatio() + { + return allocator.ownershipRatio(); + } + + @Override + public long reclaiming() + { + return allocator.reclaiming(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java new file mode 100644 index 0000000..86cea64 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import java.nio.ByteBuffer; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +public final class HeapAllocator extends AbstractAllocator +{ + public static final HeapAllocator instance = new HeapAllocator(); + + /** + * Normally you should use HeapAllocator.instance, since there is no per-Allocator state. + * This is exposed so that the reflection done by Memtable works when SlabAllocator is disabled. + */ + private HeapAllocator() {} + + public ByteBuffer allocate(int size) + { + return ByteBuffer.allocate(size); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java new file mode 100644 index 0000000..bc293cb --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -0,0 +1,16 @@ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class HeapPool extends Pool +{ + public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner) + { + super(maxOnHeapMemory, cleanupThreshold, cleaner); + } + + public HeapPoolAllocator newAllocator(OpOrder writes) + { + return new HeapPoolAllocator(this); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java new file mode 100644 index 0000000..cf798d0 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +import java.nio.ByteBuffer; + +public final class HeapPoolAllocator extends PoolAllocator +{ + HeapPoolAllocator(HeapPool pool) + { + super(pool); + } + + public ByteBuffer allocate(int size) + { + return allocate(size, null); + } + + public ByteBuffer allocate(int size, OpOrder.Group opGroup) + { + markAllocated(size, opGroup); + // must loop trying to acquire + return ByteBuffer.allocate(size); + } + + public void free(ByteBuffer name) + { + release(name.remaining()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java new file mode 100644 index 0000000..4396caf --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.OpOrder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The SlabAllocator is a bump-the-pointer allocator that allocates + * large (2MB by default) regions and then doles them out to threads that request + * slices into the array. + *

+ * The purpose of this class is to combat heap fragmentation in long lived + * objects: by ensuring that all allocations with similar lifetimes + * only to large regions of contiguous memory, we ensure that large blocks + * get freed up at the same time. + *

+ * Otherwise, variable length byte arrays allocated end up + * interleaved throughout the heap, and the old generation gets progressively + * more fragmented until a stop-the-world compacting collection occurs. + */ +public class HeapSlabAllocator extends PoolAllocator +{ + private static final Logger logger = LoggerFactory.getLogger(HeapSlabAllocator.class); + + private final static int REGION_SIZE = 1024 * 1024; + private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region + + // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more + private static final ConcurrentLinkedQueue RACE_ALLOCATED = new ConcurrentLinkedQueue<>(); + + private final AtomicReference currentRegion = new AtomicReference(); + private final AtomicInteger regionCount = new AtomicInteger(0); + private AtomicLong unslabbed = new AtomicLong(0); + + HeapSlabAllocator(Pool pool) + { + super(pool); + } + + public ByteBuffer allocate(int size) + { + return allocate(size, null); + } + + public ByteBuffer allocate(int size, OpOrder.Group opGroup) + { + assert size >= 0; + if (size == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + + markAllocated(size, opGroup); + // satisfy large allocations directly from JVM since they don't cause fragmentation + // as badly, and fill up our regions quickly + if (size > MAX_CLONED_SIZE) + { + unslabbed.addAndGet(size); + return ByteBuffer.allocate(size); + } + + while (true) + { + Region region = getRegion(); + + // Try to allocate from this region + ByteBuffer cloned = region.allocate(size); + if (cloned != null) + return cloned; + + // not enough space! + currentRegion.compareAndSet(region, null); + } + } + + public void free(ByteBuffer name) + { + // have to assume we cannot free the memory here, and just reclaim it all when we flush + } + + /** + * Get the current region, or, if there is no current region, allocate a new one + */ + private Region getRegion() + { + while (true) + { + // Try to get the region + Region region = currentRegion.get(); + if (region != null) + return region; + + // No current region, so we want to allocate one. We race + // against other allocators to CAS in a Region, and if we fail we stash the region for re-use + region = RACE_ALLOCATED.poll(); + if (region == null) + region = new Region(REGION_SIZE); + if (currentRegion.compareAndSet(null, region)) + { + regionCount.incrementAndGet(); + logger.trace("{} regions now allocated in {}", regionCount, this); + return region; + } + + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + RACE_ALLOCATED.add(region); + } + } + + /** + * A region of memory out of which allocations are sliced. + * + * This serves two purposes: + * - to provide a step between initialization and allocation, so that racing to CAS a + * new region in is harmless + * - encapsulates the allocation offset + */ + private static class Region + { + /** + * Actual underlying data + */ + private ByteBuffer data; + + /** + * Offset for the next allocation, or the sentinel value -1 + * which implies that the region is still uninitialized. + */ + private AtomicInteger nextFreeOffset = new AtomicInteger(0); + + /** + * Total number of allocations satisfied from this buffer + */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** + * Create an uninitialized region. Note that memory is not allocated yet, so + * this is cheap. + * + * @param size in bytes + */ + private Region(int size) + { + data = ByteBuffer.allocate(size); + } + + /** + * Try to allocate size bytes from the region. + * + * @return the successful allocation, or null to indicate not-enough-space + */ + public ByteBuffer allocate(int size) + { + while (true) + { + int oldOffset = nextFreeOffset.get(); + + if (oldOffset + size > data.capacity()) // capacity == remaining + return null; + + // Try to atomically claim this region + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) + { + // we got the alloc + allocCount.incrementAndGet(); + return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size); + } + // we raced and lost alloc, try again + } + } + + @Override + public String toString() + { + return "Region@" + System.identityHashCode(this) + + " allocs=" + allocCount.get() + "waste=" + + (data.capacity() - nextFreeOffset.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java new file mode 100644 index 0000000..bd1ab98 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java @@ -0,0 +1,16 @@ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class HeapSlabPool extends Pool +{ + public HeapSlabPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner) + { + super(maxOnHeapMemory, cleanupThreshold, cleaner); + } + + public HeapSlabAllocator newAllocator(OpOrder writes) + { + return new HeapSlabAllocator(this); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/Pool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java new file mode 100644 index 0000000..4e59de8 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/Pool.java @@ -0,0 +1,140 @@ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + + +/** + * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through + * child AbstractAllocator objects. AbstractAllocator and MemoryTracker correspond approximately to PoolAllocator and Pool, + * respectively, with the MemoryTracker bookkeeping the total shared use of resources, and the AbstractAllocator the amount + * checked out and in use by a specific PoolAllocator. + * + * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, + * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, + * but only needs to allocate if there are none already available. This distinction is not always meaningful. + */ +public abstract class Pool +{ + // total memory/resource permitted to allocate + public final long limit; + + // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean + public final float cleanThreshold; + + // total bytes allocated and reclaiming + private AtomicLong allocated = new AtomicLong(); + private AtomicLong reclaiming = new AtomicLong(); + + final WaitQueue hasRoom = new WaitQueue(); + + // a cache of the calculation determining at what allocation threshold we should next clean, and the cleaner we trigger + private volatile long nextClean; + private final PoolCleanerThread cleanerThread; + + public Pool(long limit, float cleanThreshold, Runnable cleaner) + { + this.limit = limit; + this.cleanThreshold = cleanThreshold; + updateNextClean(); + cleanerThread = cleaner == null ? null : new PoolCleanerThread<>(this, cleaner); + if (cleanerThread != null) + cleanerThread.start(); + } + + /** Methods for tracking and triggering a clean **/ + + boolean needsCleaning() + { + return used() >= nextClean && updateNextClean() && cleanerThread != null; + } + + void maybeClean() + { + if (needsCleaning()) + cleanerThread.trigger(); + } + + private boolean updateNextClean() + { + long reclaiming = this.reclaiming.get(); + return used() >= (nextClean = reclaiming + + (long) (this.limit * cleanThreshold)); + } + + /** Methods to allocate space **/ + + boolean tryAllocate(int size) + { + while (true) + { + long cur; + if ((cur = allocated.get()) + size > limit) + return false; + if (allocated.compareAndSet(cur, cur + size)) + { + maybeClean(); + return true; + } + } + } + + /** + * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the + * allocated total, we will signal waiters + */ + void adjustAllocated(long size) + { + if (size == 0) + return; + while (true) + { + long cur = allocated.get(); + if (allocated.compareAndSet(cur, cur + size)) + { + if (size > 0) + { + maybeClean(); + } + return; + } + } + } + + void release(long size) + { + adjustAllocated(-size); + hasRoom.signalAll(); + } + + // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans + void adjustReclaiming(long reclaiming) + { + if (reclaiming == 0) + return; + this.reclaiming.addAndGet(reclaiming); + if (reclaiming < 0 && updateNextClean() && cleanerThread != null) + cleanerThread.trigger(); + } + + public long allocated() + { + return allocated.get(); + } + + public long used() + { + return allocated.get(); + } + + public long reclaiming() + { + return reclaiming.get(); + } + + public abstract PoolAllocator newAllocator(OpOrder writes); +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java new file mode 100644 index 0000000..b30c484 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class PoolAllocator

extends AbstractAllocator +{ + public final P pool; + volatile LifeCycle state = LifeCycle.LIVE; + + static enum LifeCycle + { + LIVE, DISCARDING, DISCARDED; + LifeCycle transition(LifeCycle target) + { + assert target.ordinal() == ordinal() + 1; + return target; + } + } + + // the amount of memory/resource owned by this object + private AtomicLong owns = new AtomicLong(); + // the amount of memory we are reporting to collect; this may be inaccurate, but is close + // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount + private AtomicLong reclaiming = new AtomicLong(); + + PoolAllocator(P pool) + { + this.pool = pool; + } + + /** + * Mark this allocator as reclaiming; this will mark the memory it owns as reclaiming, so remove it from + * any calculation deciding if further cleaning/reclamation is necessary. + */ + public void setDiscarding() + { + state = state.transition(LifeCycle.DISCARDING); + // mark the memory owned by this allocator as reclaiming + long prev = reclaiming.get(); + long cur = owns.get(); + reclaiming.set(cur); + pool.adjustReclaiming(cur - prev); + } + + /** + * Indicate the memory and resources owned by this allocator are no longer referenced, + * and can be reclaimed/reused. + */ + public void setDiscarded() + { + state = state.transition(LifeCycle.DISCARDED); + // release any memory owned by this allocator; automatically signals waiters + pool.release(owns.getAndSet(0)); + pool.adjustReclaiming(-reclaiming.get()); + } + + public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup); + + /** Mark the BB as unused, permitting it to be reclaimed */ + public abstract void free(ByteBuffer name); + + // mark ourselves as owning memory from the tracker. meant to be called by subclass + // allocate method that actually allocates and returns a ByteBuffer + protected void markAllocated(int size, OpOrder.Group opGroup) + { + while (true) + { + if (pool.tryAllocate(size)) + { + acquired(size); + return; + } + WaitQueue.Signal signal = opGroup.isBlockingSignal(pool.hasRoom.register()); + boolean allocated = pool.tryAllocate(size); + if (allocated || opGroup.isBlocking()) + { + signal.cancel(); + if (allocated) // if we allocated, take ownership + acquired(size); + else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking + allocated(size); + return; + } + else + signal.awaitUninterruptibly(); + } + } + + // retroactively mark (by-passes any constraints) an amount allocated in the tracker, and owned by us. + private void allocated(int size) + { + pool.adjustAllocated(size); + owns.addAndGet(size); + } + + // retroactively mark (by-passes any constraints) an amount owned by us + private void acquired(int size) + { + owns.addAndGet(size); + } + + // release an amount of memory from our ownership, and deallocate it in the tracker + void release(int size) + { + pool.release(size); + owns.addAndGet(-size); + } + + public boolean isLive() + { + return state == LifeCycle.LIVE; + } + + /** + * Allocate a slice of the given length. + */ + public ByteBuffer clone(ByteBuffer buffer, OpOrder.Group opGroup) + { + assert buffer != null; + if (buffer.remaining() == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + ByteBuffer cloned = allocate(buffer.remaining(), opGroup); + + cloned.mark(); + cloned.put(buffer.duplicate()); + cloned.reset(); + return cloned; + } + + public ContextAllocator wrap(OpOrder.Group opGroup, ColumnFamilyStore cfs) + { + return new ContextAllocator(opGroup, this, cfs); + } + + @Override + public long owns() + { + return owns.get(); + } + + @Override + public float ownershipRatio() + { + return owns.get() / (float) pool.limit; + } + + @Override + public long reclaiming() + { + return reclaiming.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java new file mode 100644 index 0000000..bfae475 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java @@ -0,0 +1,55 @@ +package org.apache.cassandra.utils.memory; + +import org.apache.cassandra.utils.concurrent.WaitQueue; + +/** + * A thread that reclaims memor from a Pool on demand. The actual reclaiming work is delegated to the + * cleaner Runnable, e.g., FlushLargestColumnFamily + */ +class PoolCleanerThread

extends Thread +{ + /** The pool we're cleaning */ + final P pool; + + /** should ensure that at least some memory has been marked reclaiming after completion */ + final Runnable cleaner; + + /** signalled whenever needsCleaning() may return true */ + final WaitQueue wait = new WaitQueue(); + + PoolCleanerThread(P pool, Runnable cleaner) + { + super(pool.getClass().getSimpleName() + "Cleaner"); + this.pool = pool; + this.cleaner = cleaner; + } + + boolean needsCleaning() + { + return pool.needsCleaning(); + } + + // should ONLY be called when we really think it already needs cleaning + void trigger() + { + wait.signal(); + } + + @Override + public void run() + { + while (true) + { + while (!needsCleaning()) + { + final WaitQueue.Signal signal = wait.register(); + if (!needsCleaning()) + signal.awaitUninterruptibly(); + else + signal.cancel(); + } + + cleaner.run(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java new file mode 100644 index 0000000..f13c1b2 --- /dev/null +++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java @@ -0,0 +1,223 @@ +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.junit.*; +import org.slf4j.*; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +// TODO: we don't currently test SAFE functionality at all! +// TODO: should also test markBlocking and SyncOrdered +public class LongOpOrderTest +{ + + private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class); + + static final int CONSUMERS = 4; + static final int PRODUCERS = 32; + + static final long RUNTIME = TimeUnit.MINUTES.toMillis(5); + static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1); + + static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread t, Throwable e) + { + System.err.println(t.getName() + ": " + e.getMessage()); + e.printStackTrace(); + } + }; + + final OpOrder order = new OpOrder(); + final AtomicInteger errors = new AtomicInteger(); + + class TestOrdering implements Runnable + { + + final int[] waitNanos = new int[1 << 16]; + volatile State state = new State(); + final ScheduledExecutorService sched; + + TestOrdering(ExecutorService exec, ScheduledExecutorService sched) + { + this.sched = sched; + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0 ; i < waitNanos.length ; i++) + waitNanos[i] = rnd.nextInt(5000); + for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++) + exec.execute(new Producer()); + exec.execute(this); + } + + @Override + public void run() + { + final long until = System.currentTimeMillis() + RUNTIME; + long lastReport = System.currentTimeMillis(); + long count = 0; + long opCount = 0; + while (true) + { + long now = System.currentTimeMillis(); + if (now > until) + break; + if (now > lastReport + REPORT_INTERVAL) + { + lastReport = now; + logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.", + Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME)))); + } + try + { + Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]); + } catch (InterruptedException e) + { + e.printStackTrace(); + } + + final State s = state; + s.barrier = order.newBarrier(); + s.replacement = new State(); + s.barrier.issue(); + s.barrier.await(); + s.check(); + opCount += s.totalCount(); + state = s.replacement; + sched.schedule(new Runnable() + { + @Override + public void run() + { + s.check(); + } + }, 1, TimeUnit.SECONDS); + count++; + } + } + + class State + { + + volatile OpOrder.Barrier barrier; + volatile State replacement; + final NonBlockingHashMap count = new NonBlockingHashMap<>(); + int checkCount = -1; + + boolean accept(OpOrder.Group opGroup) + { + if (barrier != null && !barrier.isAfter(opGroup)) + return false; + AtomicInteger c; + if (null == (c = count.get(opGroup))) + { + count.putIfAbsent(opGroup, new AtomicInteger()); + c = count.get(opGroup); + } + c.incrementAndGet(); + return true; + } + + int totalCount() + { + int c = 0; + for (AtomicInteger v : count.values()) + c += v.intValue(); + return c; + } + + void check() + { + boolean delete; + if (checkCount >= 0) + { + if (checkCount != totalCount()) + { + errors.incrementAndGet(); + logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount()); + } + delete = true; + } + else + { + checkCount = totalCount(); + delete = false; + } + for (Map.Entry e : count.entrySet()) + { + if (e.getKey().compareTo(barrier.getSyncPoint()) > 0) + { + errors.incrementAndGet(); + logger.error("Received an operation that was created after the barrier was issued."); + } + if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue()) + { + errors.incrementAndGet(); + logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue()); + } + if (delete) + TestOrdering.this.count.remove(e.getKey()); + } + } + + } + + final NonBlockingHashMap count = new NonBlockingHashMap<>(); + + class Producer implements Runnable + { + public void run() + { + while (true) + { + AtomicInteger c; + OpOrder.Group opGroup = order.start(); + try + { + if (null == (c = count.get(opGroup))) + { + count.putIfAbsent(opGroup, new AtomicInteger()); + c = count.get(opGroup); + } + c.incrementAndGet(); + State s = state; + while (!s.accept(opGroup)) + s = s.replacement; + } + finally + { + opGroup.finishOne(); + } + } + } + } + + } + + @Test + public void testOrdering() throws InterruptedException + { + errors.set(0); + Thread.setDefaultUncaughtExceptionHandler(handler); + final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker")); + final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker")); + for (int i = 0 ; i < CONSUMERS ; i++) + new TestOrdering(exec, checker); + exec.shutdown(); + exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS); + assertTrue(exec.isShutdown()); + assertTrue(errors.get() == 0); + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java new file mode 100644 index 0000000..e7bfe30 --- /dev/null +++ b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java @@ -0,0 +1,72 @@ +package org.apache.cassandra.db; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class LongFlushMemtableTest extends SchemaLoader +{ + @Test + public void testFlushMemtables() throws IOException, ConfigurationException + { + Keyspace keyspace = Keyspace.open("Keyspace1"); + for (int i = 0; i < 100; i++) + { + CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance); + MigrationManager.announceNewColumnFamily(metadata); + } + + for (int j = 0; j < 200; j++) + { + for (int i = 0; i < 100; i++) + { + Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j)); + ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i); + // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory + ByteBuffer value = ByteBuffer.allocate(100000); + cf.addColumn(new Cell(Util.cellname("c"), value)); + rm.add(cf); + rm.applyUnsafe(); + } + } + + int flushes = 0; + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + if (cfs.name.startsWith("_CF")) + flushes += cfs.getMemtableSwitchCount(); + } + assert flushes > 0; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/MeteredFlusherTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java b/test/long/org/apache/cassandra/db/MeteredFlusherTest.java deleted file mode 100644 index 4bab277..0000000 --- a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.apache.cassandra.db; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class MeteredFlusherTest extends SchemaLoader -{ - @Test - public void testManyMemtables() throws IOException, ConfigurationException - { - Keyspace keyspace = Keyspace.open("Keyspace1"); - for (int i = 0; i < 100; i++) - { - CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance); - MigrationManager.announceNewColumnFamily(metadata); - } - - for (int j = 0; j < 200; j++) - { - for (int i = 0; i < 100; i++) - { - Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j)); - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i); - // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory - ByteBuffer value = ByteBuffer.allocate(100000); - cf.addColumn(new Cell(Util.cellname("c"), value)); - rm.add(cf); - rm.applyUnsafe(); - } - } - - int flushes = 0; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - if (cfs.name.startsWith("_CF")) - flushes += cfs.getMemtableSwitchCount(); - } - assert flushes > 0; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/utils/LongBTreeTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java index 9f31743..b8e60de 100644 --- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java +++ b/test/long/org/apache/cassandra/utils/LongBTreeTest.java @@ -50,7 +50,7 @@ public class LongBTreeTest TreeSet canon = new TreeSet<>(); for (int i = 0 ; i < 10000000 ; i++) canon.add(i); - Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true); + Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null); btree = BTree.update(btree, ICMP, canon, true); canon.add(Integer.MIN_VALUE); canon.add(Integer.MAX_VALUE); @@ -171,7 +171,7 @@ public class LongBTreeTest canon.putAll(buffer); ctxt.stop(); ctxt = BTREE_TIMER.time(); - btree = BTree.update(btree, ICMP, buffer.keySet(), true, null, null); + btree = BTree.update(btree, ICMP, buffer.keySet(), true, null); ctxt.stop(); if (quickEquality) @@ -200,7 +200,7 @@ public class LongBTreeTest String id = String.format("[0..%d)", canon.size()); System.out.println("Testing " + id); Futures.allAsList(testAllSlices(id, cur, canon)).get(); - cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null, null); + cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null); canon.add(i); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index aa6d3dd..849c30c 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -142,7 +142,7 @@ public class CacheProviderTest extends SchemaLoader this.string = input; } - public long memorySize() + public long unsharedHeapSize() { return string.length(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java deleted file mode 100644 index da34711..0000000 --- a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package org.apache.cassandra.cache; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; -import java.util.UUID; - -import org.junit.Assert; - -import org.apache.cassandra.db.ColumnIndex; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.utils.ObjectSizes; -import org.github.jamm.MemoryMeter; -import org.junit.Test; - -public class ObjectSizeTest -{ - public static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead(); - - @Test - public void testArraySizes() - { - long size = ObjectSizes.getArraySize(0, 1); - long size2 = meter.measureDeep(new byte[0]); - Assert.assertEquals(size, size2); - } - - @Test - public void testBiggerArraySizes() - { - long size = ObjectSizes.getArraySize(0, 1); - long size2 = meter.measureDeep(new byte[0]); - Assert.assertEquals(size, size2); - - size = ObjectSizes.getArraySize(8, 1); - size2 = meter.measureDeep(new byte[8]); - Assert.assertEquals(size, size2); - } - - @Test - public void testKeyCacheKey() - { - KeyCacheKey key = new KeyCacheKey(null, null, ByteBuffer.wrap(new byte[0])); - long size = key.memorySize(); - long size2 = meter.measureDeep(key); - Assert.assertEquals(size, size2); - } - - @Test - public void testKeyCacheValue() - { - RowIndexEntry entry = new RowIndexEntry(123); - long size = entry.memorySize(); - long size2 = meter.measureDeep(entry); - Assert.assertEquals(size, size2); - } - - @Test - public void testKeyCacheValueWithDelInfo() - { - RowIndexEntry entry = RowIndexEntry.create(123, new DeletionTime(123, 123), ColumnIndex.nothing()); - long size = entry.memorySize(); - long size2 = meter.measureDeep(entry); - Assert.assertEquals(size, size2); - } - - @Test - public void testRowCacheKey() - { - UUID id = UUID.randomUUID(); - RowCacheKey key = new RowCacheKey(id, ByteBuffer.wrap(new byte[11])); - long size = key.memorySize(); - long size2 = meter.measureDeep(key) - meter.measureDeep(id); - Assert.assertEquals(size, size2); - } - - @Test - public void testRowCacheSentinel() - { - RowCacheSentinel sentinel = new RowCacheSentinel(123); - long size = sentinel.memorySize(); - long size2 = meter.measureDeep(sentinel); - Assert.assertEquals(size, size2); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java index c2888bc..15d40dc 100644 --- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java +++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java @@ -1,6 +1,6 @@ package org.apache.cassandra.concurrent; -import org.apache.cassandra.utils.WaitQueue; +import org.apache.cassandra.utils.concurrent.WaitQueue; import org.junit.*; import java.util.concurrent.atomic.AtomicBoolean;