Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 71B12200BF7 for ; Mon, 9 Jan 2017 18:23:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 704D6160B3E; Mon, 9 Jan 2017 17:23:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 34094160B2F for ; Mon, 9 Jan 2017 18:23:28 +0100 (CET) Received: (qmail 51850 invoked by uid 500); 9 Jan 2017 17:23:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 51837 invoked by uid 99); 9 Jan 2017 17:23:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 17:23:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D030DF9FD; Mon, 9 Jan 2017 17:23:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Message-Id: <073fbee541884ee09e2e3d84fdb6f006@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: applied future Date: Mon, 9 Jan 2017 17:23:27 +0000 (UTC) archived-at: Mon, 09 Jan 2017 17:23:29 -0000 Repository: ignite Updated Branches: refs/heads/ignite-comm-balance-master 1484faa41 -> 312706eaa applied future Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/312706ea Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/312706ea Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/312706ea Branch: refs/heads/ignite-comm-balance-master Commit: 312706eaac00946b81fd552a6644e2d1a89805ae Parents: 1484faa Author: Yakov Zhdanov Authored: Mon Jan 9 20:22:52 2017 +0300 Committer: Yakov Zhdanov Committed: Mon Jan 9 20:22:52 2017 +0300 ---------------------------------------------------------------------- .../internal/util/future/GridFutureAdapter.java | 418 +++++++++++-------- 1 file changed, 248 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/312706ea/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 723dff7..39b044d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -17,18 +17,16 @@ package org.apache.ignite.internal.util.future; -import java.util.Arrays; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -38,32 +36,58 @@ import org.jetbrains.annotations.Nullable; /** * Future adapter. + * TODO: + * 1. remove serializable + * 2. remove start time, end time. + * 3. remove ignore interrupts flag. */ -public class GridFutureAdapter extends AbstractQueuedSynchronizer implements IgniteInternalFuture { - /** */ - private static final long serialVersionUID = 0L; +public class GridFutureAdapter implements IgniteInternalFuture { + /* + * https://bugs.openjdk.java.net/browse/JDK-8074773 + */ + static { + Class ensureLoaded = LockSupport.class; + } - /** Initial state. */ - private static final int INIT = 0; + /** + * Future state. + */ + private enum State { + INIT, - /** Cancelled state. */ - private static final int CANCELLED = 1; + CANCELLED, + } - /** Done state. */ - private static final int DONE = 2; + /** + * + */ + static final class WaitNode { + /** */ + private final Object waiter; - /** */ - private static final byte ERR = 1; + /** */ + private volatile Object next; + + /** + * @param waiter Waiter. + */ + WaitNode(Object waiter) { + this.waiter = waiter; + } + } + + /** State updater. */ + private static final AtomicReferenceFieldUpdater stateUpd = + AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state"); /** */ - private static final byte RES = 2; + private static final long serialVersionUID = 0L; /** */ - private byte resFlag; + private boolean ignoreInterrupts; - /** Result. */ - @GridToStringInclude(sensitive = true) - private Object res; + /** */ + private volatile Object state = State.INIT; /** Future start time. */ private final long startTime = U.currentTimeMillis(); @@ -71,13 +95,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** Future end time. */ private volatile long endTime; - /** */ - private boolean ignoreInterrupts; - - /** */ - @GridToStringExclude - private IgniteInClosure> lsnr; - /** {@inheritDoc} */ @Override public long startTime() { return startTime; @@ -106,12 +123,16 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public Throwable error() { - return (resFlag == ERR) ? (Throwable)res : null; + Object state0 = state; + + return (state0 instanceof Throwable) ? (Throwable)state0 : null; } /** {@inheritDoc} */ @Override public R result() { - return resFlag == RES ? (R)res : null; + Object state0 = state; + + return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null; } /** {@inheritDoc} */ @@ -153,28 +174,36 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * @throws IgniteCheckedException If failed. */ private R get0(boolean ignoreInterrupts) throws IgniteCheckedException { + Object res = registerWaiter(Thread.currentThread()); + + if (res != State.INIT) { + // no registration was done since a value is available. + return resolve(res); + } + + boolean interrupted = false; + try { - if (endTime == 0) { - if (ignoreInterrupts) - acquireShared(0); - else - acquireSharedInterruptibly(0); - } + for (; ; ) { + LockSupport.park(); - if (getState() == CANCELLED) - throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); + if (isDone()) + return resolve(state); - assert resFlag != 0; + else if (Thread.interrupted()) { + interrupted = true; - if (resFlag == ERR) - throw U.cast((Throwable)res); + if (!ignoreInterrupts) { + unregisterWaiter(Thread.currentThread()); - return (R)res; + throw new IgniteInterruptedCheckedException("Thread has been interrupted."); + } + } + } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); + finally { + if (interrupted) + Thread.currentThread().interrupt(); } } @@ -186,46 +215,157 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * @throws IgniteCheckedException If error occurred. */ @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException { - if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout)) - throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); + Object res = registerWaiter(Thread.currentThread()); + + if (res != State.INIT) + return resolve(res); - if (getState() == CANCELLED) + long deadlineNanos = System.nanoTime() + nanosTimeout; + + boolean interrupted = false; + + try { + long nanosTimeout0 = nanosTimeout; + + while (nanosTimeout0 > 0) { + LockSupport.parkNanos(nanosTimeout0); + + nanosTimeout0 = deadlineNanos - System.nanoTime(); + + if (isDone()) + return resolve(state); + + else if (Thread.interrupted()) { + interrupted = true; + + if (!ignoreInterrupts) + throw new IgniteInterruptedCheckedException("Thread has been interrupted."); + } + } + } + finally { + if (interrupted) + Thread.currentThread().interrupt(); + + unregisterWaiter(Thread.currentThread()); + } + + throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); + } + + /** + * Resolves the value to result or exception. + * + * @param val Value to resolve. + * @return Result. + * @throws IgniteCheckedException If resolved to exception. + */ + private R resolve(Object val) throws IgniteCheckedException { + if (val == State.CANCELLED) throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - assert resFlag != 0; + if (val instanceof Throwable) + throw U.cast((Throwable)val); - if (resFlag == ERR) - throw U.cast((Throwable)res); + return (R)val; + } + + /** + * @param waiter Waiter to register. + * @return Previous state. + */ + private Object registerWaiter(Object waiter) { + WaitNode waitNode = null; + + for (; ; ) { + final Object oldState = state; + + if (isDone(oldState)) + return oldState; + + Object newState; + + if (oldState == State.INIT) + newState = waiter; + + else { + if (waitNode == null) + waitNode = new WaitNode(waiter); + + waitNode.next = oldState; - return (R)res; + newState = waitNode; + } + + if (compareAndSetState(oldState, newState)) + return State.INIT; + } } - /** {@inheritDoc} */ - @Override public void listen(IgniteInClosure> lsnr0) { - assert lsnr0 != null; + /** + * @param waiter Waiter to unregister. + */ + void unregisterWaiter(Thread waiter) { + WaitNode prev = null; + Object cur = state; - boolean done = isDone(); + while (cur != null) { + Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur; + Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null; - if (!done) { - synchronized (this) { - done = isDone(); // Double check. + if (curWaiter == waiter) { + if (prev == null) { + Object n = next == null ? State.INIT : next; - if (!done) { - if (lsnr == null) - lsnr = lsnr0; - else if (lsnr instanceof ArrayListener) - ((ArrayListener)lsnr).add(lsnr0); - else - lsnr = (IgniteInClosure)new ArrayListener(lsnr, lsnr0); + cur = compareAndSetState(cur, n) ? null : state; + } + else { + prev.next = next; - return; + cur = null; } } + else { + prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null; + + cur = next; + } } + } + + /** + * @param waiter Head of waiters queue to unblock. + */ + private void unblockAll(Object waiter) { + while (waiter != null) { + if (waiter instanceof Thread) { + LockSupport.unpark((Thread)waiter); - assert done; + return; + } + else if (waiter instanceof IgniteInClosure) { + notifyListener((IgniteInClosure>)waiter); - notifyListener(lsnr0); + return; + } + else if (waiter.getClass() == WaitNode.class) { + WaitNode waitNode = (WaitNode) waiter; + + unblockAll(waitNode.waiter); + + waiter = waitNode.next; + } + else + return; + } + } + + /** {@inheritDoc} */ + @Override public void listen(IgniteInClosure> newLsnr) { + Object res = registerWaiter(newLsnr); + + if (res != State.INIT) + notifyListener(newLsnr); } /** {@inheritDoc} */ @@ -240,23 +380,10 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements } /** - * Notifies all registered listeners. + * @return Logger instance. */ - private void notifyListeners() { - IgniteInClosure> lsnr0; - - synchronized (this) { - lsnr0 = lsnr; - - if (lsnr0 == null) - return; - - lsnr = null; - } - - assert lsnr0 != null; - - notifyListener(lsnr0); + @Nullable public IgniteLogger logger() { + return null; } /** @@ -293,9 +420,19 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public boolean isDone() { - // Don't check for "valid" here, as "done" flag can be read - // even in invalid state. - return endTime != 0; + return isDone(state); + } + + /** + * @param state State to check. + * @return {@code True} if future is done. + */ + private boolean isDone(Object state) { + return state == null || + !(state == State.INIT + || state.getClass() == WaitNode.class + || state instanceof Thread + || state instanceof IgniteInClosure); } /** @@ -303,12 +440,12 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements */ public boolean isFailed() { // Must read endTime first. - return endTime != 0 && resFlag == ERR; + return state instanceof Throwable; } /** {@inheritDoc} */ @Override public boolean isCancelled() { - return getState() == CANCELLED; + return state == State.CANCELLED; } /** @@ -362,35 +499,34 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * @return {@code True} if result was set by this call. */ private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { - boolean notify = false; + Object val = cancel ? State.CANCELLED : err != null ? err : res; - try { - if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) { - if (err != null) { - resFlag = ERR; - this.res = err; - } - else { - resFlag = RES; - this.res = res; - } + for (; ; ) { + final Object oldState = state; + + if (isDone(oldState)) + return false; - notify = true; + if (compareAndSetState(oldState, val)) { + endTime = U.currentTimeMillis(); - releaseShared(0); + unblockAll(oldState); return true; } - - return false; - } - finally { - if (notify) - notifyListeners(); } } /** + * @param oldState Old state to check. + * @param newState New state to set. + * @return {@code True} if state has been changed (CASed). + */ + private boolean compareAndSetState(Object oldState, Object newState) { + return state == oldState && stateUpd.compareAndSet(this, oldState, newState); + } + + /** * Callback to notify that future is cancelled. * * @return {@code True} if cancel flag was set by this call. @@ -399,76 +535,18 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements return onDone(null, null, true); } - /** {@inheritDoc} */ - @Override protected final int tryAcquireShared(int ignore) { - return endTime != 0 ? 1 : -1; - } - - /** {@inheritDoc} */ - @Override protected final boolean tryReleaseShared(int ignore) { - endTime = U.currentTimeMillis(); - - // Always signal after setting final done status. - return true; - } - /** * @return String representation of state. */ private String state() { - int s = getState(); + Object s = state; - return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE"; - } - - /** - * @return Logger instance. - */ - @Nullable public IgniteLogger logger() { - return null; + return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT"; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridFutureAdapter.class, this, "state", state()); - } - - /** - * - */ - private static class ArrayListener implements IgniteInClosure> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteInClosure>[] arr; - - /** - * @param lsnrs Listeners. - */ - private ArrayListener(IgniteInClosure... lsnrs) { - this.arr = lsnrs; - } - - /** {@inheritDoc} */ - @Override public void apply(IgniteInternalFuture fut) { - for (int i = 0; i < arr.length; i++) - arr[i].apply(fut); - } - - /** - * @param lsnr Listener. - */ - void add(IgniteInClosure> lsnr) { - arr = Arrays.copyOf(arr, arr.length + 1); - - arr[arr.length - 1] = lsnr; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ArrayListener.class, this, "arrSize", arr.length); - } + return S.toString(GridFutureAdapter.class, this, "state", state(), "hash", System.identityHashCode(this)); } /**