ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject ignite git commit: applied future
Date Mon, 09 Jan 2017 17:23:27 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master 1484faa41 -> 312706eaa


applied future


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/312706ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/312706ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/312706ea

Branch: refs/heads/ignite-comm-balance-master
Commit: 312706eaac00946b81fd552a6644e2d1a89805ae
Parents: 1484faa
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Mon Jan 9 20:22:52 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Mon Jan 9 20:22:52 2017 +0300

----------------------------------------------------------------------
 .../internal/util/future/GridFutureAdapter.java | 418 +++++++++++--------
 1 file changed, 248 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/312706ea/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 723dff7..39b044d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,18 +17,16 @@
 
 package org.apache.ignite.internal.util.future;
 
-import java.util.Arrays;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -38,32 +36,58 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Future adapter.
+ * TODO:
+ * 1. remove serializable
+ * 2. remove start time, end time.
+ * 3. remove ignore interrupts flag.
  */
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R>
{
-    /** */
-    private static final long serialVersionUID = 0L;
+public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
+    /*
+     * https://bugs.openjdk.java.net/browse/JDK-8074773
+     */
+    static {
+        Class<?> ensureLoaded = LockSupport.class;
+    }
 
-    /** Initial state. */
-    private static final int INIT = 0;
+    /**
+     * Future state.
+     */
+    private enum State {
+        INIT,
 
-    /** Cancelled state. */
-    private static final int CANCELLED = 1;
+        CANCELLED,
+    }
 
-    /** Done state. */
-    private static final int DONE = 2;
+    /**
+     *
+     */
+    static final class WaitNode {
+        /** */
+        private final Object waiter;
 
-    /** */
-    private static final byte ERR = 1;
+        /** */
+        private volatile Object next;
+
+        /**
+         * @param waiter Waiter.
+         */
+        WaitNode(Object waiter) {
+            this.waiter = waiter;
+        }
+    }
+
+    /** State updater. */
+    private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd
=
+        AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
 
     /** */
-    private static final byte RES = 2;
+    private static final long serialVersionUID = 0L;
 
     /** */
-    private byte resFlag;
+    private boolean ignoreInterrupts;
 
-    /** Result. */
-    @GridToStringInclude(sensitive = true)
-    private Object res;
+    /** */
+    private volatile Object state = State.INIT;
 
     /** Future start time. */
     private final long startTime = U.currentTimeMillis();
@@ -71,13 +95,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     /** Future end time. */
     private volatile long endTime;
 
-    /** */
-    private boolean ignoreInterrupts;
-
-    /** */
-    @GridToStringExclude
-    private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
-
     /** {@inheritDoc} */
     @Override public long startTime() {
         return startTime;
@@ -106,12 +123,16 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
 
     /** {@inheritDoc} */
     @Override public Throwable error() {
-        return (resFlag == ERR) ? (Throwable)res : null;
+        Object state0 = state;
+
+        return (state0 instanceof Throwable) ? (Throwable)state0 : null;
     }
 
     /** {@inheritDoc} */
     @Override public R result() {
-        return resFlag == RES ? (R)res : null;
+        Object state0 = state;
+
+        return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null;
     }
 
     /** {@inheritDoc} */
@@ -153,28 +174,36 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
      * @throws IgniteCheckedException If failed.
      */
     private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
+        Object res = registerWaiter(Thread.currentThread());
+
+        if (res != State.INIT) {
+            // no registration was done since a value is available.
+            return resolve(res);
+        }
+
+        boolean interrupted = false;
+
         try {
-            if (endTime == 0) {
-                if (ignoreInterrupts)
-                    acquireShared(0);
-                else
-                    acquireSharedInterruptibly(0);
-            }
+            for (; ; ) {
+                LockSupport.park();
 
-            if (getState() == CANCELLED)
-                throw new IgniteFutureCancelledCheckedException("Future was cancelled: "
+ this);
+                if (isDone())
+                    return resolve(state);
 
-            assert resFlag != 0;
+                else if (Thread.interrupted()) {
+                    interrupted = true;
 
-            if (resFlag == ERR)
-                throw U.cast((Throwable)res);
+                    if (!ignoreInterrupts) {
+                        unregisterWaiter(Thread.currentThread());
 
-            return (R)res;
+                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                    }
+                }
+            }
         }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
+        finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
         }
     }
 
@@ -186,46 +215,157 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
      * @throws IgniteCheckedException If error occurred.
      */
     @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException
{
-        if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
-            throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation
completed.");
+        Object res = registerWaiter(Thread.currentThread());
+
+        if (res != State.INIT)
+            return resolve(res);
 
-        if (getState() == CANCELLED)
+        long deadlineNanos = System.nanoTime() + nanosTimeout;
+
+        boolean interrupted = false;
+
+        try {
+            long nanosTimeout0 = nanosTimeout;
+
+            while (nanosTimeout0 > 0) {
+                LockSupport.parkNanos(nanosTimeout0);
+
+                nanosTimeout0 = deadlineNanos - System.nanoTime();
+
+                if (isDone())
+                    return resolve(state);
+
+                else if (Thread.interrupted()) {
+                    interrupted = true;
+
+                    if (!ignoreInterrupts)
+                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                }
+            }
+        }
+        finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+
+            unregisterWaiter(Thread.currentThread());
+        }
+
+        throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation
completed.");
+    }
+
+    /**
+     * Resolves the value to result or exception.
+     *
+     * @param val Value to resolve.
+     * @return Result.
+     * @throws IgniteCheckedException If resolved to exception.
+     */
+    private R resolve(Object val) throws IgniteCheckedException {
+        if (val == State.CANCELLED)
             throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-        assert resFlag != 0;
+        if (val instanceof Throwable)
+            throw U.cast((Throwable)val);
 
-        if (resFlag == ERR)
-            throw U.cast((Throwable)res);
+        return (R)val;
+    }
+
+    /**
+     * @param waiter Waiter to register.
+     * @return Previous state.
+     */
+    private Object registerWaiter(Object waiter) {
+        WaitNode waitNode = null;
+
+        for (; ; ) {
+            final Object oldState = state;
+
+            if (isDone(oldState))
+                return oldState;
+
+            Object newState;
+
+            if (oldState == State.INIT)
+                newState = waiter;
+
+            else {
+                if (waitNode == null)
+                    waitNode = new WaitNode(waiter);
+
+                waitNode.next = oldState;
 
-        return (R)res;
+                newState = waitNode;
+            }
+
+            if (compareAndSetState(oldState, newState))
+                return State.INIT;
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>>
lsnr0) {
-        assert lsnr0 != null;
+    /**
+     * @param waiter Waiter to unregister.
+     */
+    void unregisterWaiter(Thread waiter) {
+        WaitNode prev = null;
+        Object cur = state;
 
-        boolean done = isDone();
+        while (cur != null) {
+            Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter
: cur;
+            Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null;
 
-        if (!done) {
-            synchronized (this) {
-                done = isDone(); // Double check.
+            if (curWaiter == waiter) {
+                if (prev == null) {
+                    Object n = next == null ? State.INIT : next;
 
-                if (!done) {
-                    if (lsnr == null)
-                        lsnr = lsnr0;
-                    else if (lsnr instanceof ArrayListener)
-                        ((ArrayListener)lsnr).add(lsnr0);
-                    else
-                        lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr,
lsnr0);
+                    cur = compareAndSetState(cur, n) ? null : state;
+                }
+                else {
+                    prev.next = next;
 
-                    return;
+                    cur = null;
                 }
             }
+            else {
+                prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null;
+
+                cur = next;
+            }
         }
+    }
+
+    /**
+     * @param waiter Head of waiters queue to unblock.
+     */
+    private void unblockAll(Object waiter) {
+        while (waiter != null) {
+            if (waiter instanceof Thread) {
+                LockSupport.unpark((Thread)waiter);
 
-        assert done;
+                return;
+            }
+            else if (waiter instanceof IgniteInClosure) {
+                notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
 
-        notifyListener(lsnr0);
+                return;
+            }
+            else if (waiter.getClass() == WaitNode.class) {
+                WaitNode waitNode = (WaitNode) waiter;
+
+                unblockAll(waitNode.waiter);
+
+                waiter = waitNode.next;
+            }
+            else
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>>
newLsnr) {
+        Object res = registerWaiter(newLsnr);
+
+        if (res != State.INIT)
+            notifyListener(newLsnr);
     }
 
     /** {@inheritDoc} */
@@ -240,23 +380,10 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     }
 
     /**
-     * Notifies all registered listeners.
+     * @return Logger instance.
      */
-    private void notifyListeners() {
-        IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
-
-        synchronized (this) {
-            lsnr0 = lsnr;
-
-            if (lsnr0 == null)
-                return;
-
-            lsnr = null;
-        }
-
-        assert lsnr0 != null;
-
-        notifyListener(lsnr0);
+    @Nullable public IgniteLogger logger() {
+        return null;
     }
 
     /**
@@ -293,9 +420,19 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
 
     /** {@inheritDoc} */
     @Override public boolean isDone() {
-        // Don't check for "valid" here, as "done" flag can be read
-        // even in invalid state.
-        return endTime != 0;
+        return isDone(state);
+    }
+
+    /**
+     * @param state State to check.
+     * @return {@code True} if future is done.
+     */
+    private boolean isDone(Object state) {
+        return state == null ||
+            !(state == State.INIT
+                || state.getClass() == WaitNode.class
+                || state instanceof Thread
+                || state instanceof IgniteInClosure);
     }
 
     /**
@@ -303,12 +440,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
      */
     public boolean isFailed() {
         // Must read endTime first.
-        return endTime != 0 && resFlag == ERR;
+        return state instanceof Throwable;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isCancelled() {
-        return getState() == CANCELLED;
+        return state == State.CANCELLED;
     }
 
     /**
@@ -362,35 +499,34 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
      * @return {@code True} if result was set by this call.
      */
     private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
-        boolean notify = false;
+        Object val = cancel ? State.CANCELLED : err != null ? err : res;
 
-        try {
-            if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
-                if (err != null) {
-                    resFlag = ERR;
-                    this.res = err;
-                }
-                else {
-                    resFlag = RES;
-                    this.res = res;
-                }
+        for (; ; ) {
+            final Object oldState = state;
+
+            if (isDone(oldState))
+                return false;
 
-                notify = true;
+            if (compareAndSetState(oldState, val)) {
+                endTime = U.currentTimeMillis();
 
-                releaseShared(0);
+                unblockAll(oldState);
 
                 return true;
             }
-
-            return false;
-        }
-        finally {
-            if (notify)
-                notifyListeners();
         }
     }
 
     /**
+     * @param oldState Old state to check.
+     * @param newState New state to set.
+     * @return {@code True} if state has been changed (CASed).
+     */
+    private boolean compareAndSetState(Object oldState, Object newState) {
+        return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
+    }
+
+    /**
      * Callback to notify that future is cancelled.
      *
      * @return {@code True} if cancel flag was set by this call.
@@ -399,76 +535,18 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
         return onDone(null, null, true);
     }
 
-    /** {@inheritDoc} */
-    @Override protected final int tryAcquireShared(int ignore) {
-        return endTime != 0 ? 1 : -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected final boolean tryReleaseShared(int ignore) {
-        endTime = U.currentTimeMillis();
-
-        // Always signal after setting final done status.
-        return true;
-    }
-
     /**
      * @return String representation of state.
      */
     private String state() {
-        int s = getState();
+        Object s = state;
 
-        return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
-    }
-
-    /**
-     * @return Logger instance.
-     */
-    @Nullable public IgniteLogger logger() {
-        return null;
+        return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT";
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridFutureAdapter.class, this, "state", state());
-    }
-
-    /**
-     *
-     */
-    private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>>
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
-
-        /**
-         * @param lsnrs Listeners.
-         */
-        private ArrayListener(IgniteInClosure... lsnrs) {
-            this.arr = lsnrs;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void apply(IgniteInternalFuture<R> fut) {
-            for (int i = 0; i < arr.length; i++)
-                arr[i].apply(fut);
-        }
-
-        /**
-         * @param lsnr Listener.
-         */
-        void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
-            arr = Arrays.copyOf(arr, arr.length + 1);
-
-            arr[arr.length - 1] = lsnr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ArrayListener.class, this, "arrSize", arr.length);
-        }
+        return S.toString(GridFutureAdapter.class, this, "state", state(), "hash", System.identityHashCode(this));
     }
 
     /**


Mime
View raw message