ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/5] ignite git commit: wip
Date Mon, 05 Jun 2017 08:57:07 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd1310f6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd1310f6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd1310f6

Branch: refs/heads/ignite-5293
Commit: dd1310f664194d9948eac5a6483e71163821746a
Parents: 87a79f1
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Fri Jun 2 17:27:05 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Fri Jun 2 17:27:05 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLockFuture.java      |  19 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  | 185 ++++++++++++-------
 .../distributed/dht/GridDhtTxPrepareFuture.java | 169 ++++++++++-------
 3 files changed, 236 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dd1310f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 9365259..a6cec4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -477,15 +477,20 @@ public final class GridDhtLockFuture extends GridCacheFutureAdapter<Boolean>
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
+        List<MiniFuture> miniFuts0;
+
         synchronized (this) {
-            if (miniFuts != null) {
-                for (MiniFuture miniFut : miniFuts) {
-                    if (miniFut.node().id().equals(nodeId)) {
-                        miniFut.onResult();
+            if (miniFuts == null)
+                return false;
 
-                        found = true;
-                    }
-                }
+            miniFuts0 = new ArrayList<>(miniFuts);
+        }
+
+        for (MiniFuture miniFut : miniFuts0) {
+            if (miniFut.node().id().equals(nodeId)) {
+                miniFut.onResult();
+
+                found = true;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dd1310f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 8a31bac..98442bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -26,19 +27,16 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -52,11 +50,9 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 /**
  *
  */
-public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
+public final class GridDhtTxFinishFuture<K, V>
+    extends GridCacheFutureAdapter<IgniteInternalTx>
     implements GridCacheFuture<IgniteInternalTx> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -94,14 +90,21 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     /** Near mappings. */
     private Map<UUID, GridDistributedTxMapping> nearMap;
 
+    /** */
+    private List<MiniFuture> miniFuts;
+
+    /** */
+    private int completedMiniCnt;
+
+    /** */
+    private volatile boolean inited;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
      * @param commit Commit flag.
      */
     public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter
tx, boolean commit) {
-        super(F.<IgniteInternalTx>identityReducer(tx));
-
         this.cctx = cctx;
         this.tx = tx;
         this.commit = commit;
@@ -118,6 +121,18 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /**
+     * @param fut Mini future to add.
+     */
+    private synchronized void add(MiniFuture fut) {
+        if (miniFuts == null)
+            miniFuts = new ArrayList<>();
+
+        assert fut.futureId() == miniFuts.size();
+
+        miniFuts.add(fut);
+    }
+
+    /**
      * @return Transaction.
      */
     public GridDhtTxLocalAdapter tx() {
@@ -130,18 +145,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteInternalFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+        List<MiniFuture> miniFuts0;
 
-                if (f.node().id().equals(nodeId)) {
-                    f.onNodeLeft();
+        synchronized (this) {
+            if (miniFuts == null)
+                return false;
 
-                    return true;
-                }
+            miniFuts0 = new ArrayList<>(miniFuts);
+        }
+
+        for (MiniFuture miniFut : miniFuts0) {
+            if (miniFut.node().id().equals(nodeId)) {
+                miniFut.onNodeLeft();
+
+                return true;
             }
+        }
 
         return false;
     }
@@ -173,25 +193,37 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(int miniId) {
+        synchronized (this) {
+            if (miniFuts != null && !miniFuts.get(miniId).isDone())
+                return miniFuts.get(miniId);
+        }
+
+        return null;
+    }
+
+    /**
      * @param nodeId Sender.
      * @param res Result.
      */
     public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
         if (!isDone()) {
-            boolean found = false;
+            boolean found;
 
-            for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (f.futureId() == res.miniId()) {
-                        found = true;
+            if (mini == null)
+                found = false;
+            else {
+                found = true;
 
-                        assert f.node().id().equals(nodeId);
-
-                        f.onResult(res);
-                    }
-                }
+                mini.onResult(res);
             }
 
             if (!found) {
@@ -217,7 +249,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
 
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
-        if (initialized() || err != null) {
+        if (inited || err != null) {
             Throwable e = this.err;
 
             if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
@@ -255,14 +287,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
      * Completeness callback.
      */
     private void onComplete() {
@@ -314,7 +338,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         for (ClusterNode n : nodes) {
             assert !n.isLocal();
 
-            MiniFuture fut = new MiniFuture(++miniId, n);
+            MiniFuture fut = new MiniFuture(miniId++, n);
 
             add(fut); // Append new future.
 
@@ -361,7 +385,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft();
                 else {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("DHT finish fut, failed to send request lock tx [txId="
+ tx.nearXidVersion() +
@@ -379,6 +403,25 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /**
+     * Marks future as initialized.
+     */
+    private void markInitialized() {
+        boolean completed;
+
+        synchronized (this) {
+            if (inited)
+                return;
+
+            inited = true;
+
+            completed = miniFuts == null || completedMiniCnt == miniFuts.size();
+        }
+
+        if (completed)
+            onDone();
+    }
+
+    /**
      * @param commit Commit flag.
      * @param dhtMap DHT map.
      * @param nearMap Near map.
@@ -411,7 +454,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 // Nothing to send.
                 continue;
 
-            MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
+            MiniFuture fut = new MiniFuture(miniId++, dhtMapping, nearMapping);
 
             add(fut); // Append new future.
 
@@ -466,7 +509,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft();
                 else {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("DHT finish fut, failed to send request dht [txId="
+ tx.nearXidVersion() +
@@ -535,7 +578,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.
                     if (e instanceof ClusterTopologyCheckedException)
-                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                        fut.onNodeLeft();
                     else {
                         if (msgLog.isDebugEnabled()) {
                             msgLog.debug("DHT finish fut, failed to send request near [txId="
+ tx.nearXidVersion() +
@@ -554,19 +597,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>,
String>() {
-            @SuppressWarnings("unchecked")
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).node().id() +
-                    ", loc=" + ((MiniFuture)f).node().isLocal() +
-                    ", done=" + f.isDone() + "]";
-            }
-        });
-
+    @Override public synchronized String toString() {
         return S.toString(GridDhtTxFinishFuture.class, this,
             "xidVer", tx.xidVersion(),
-            "innerFuts", futs,
+            "innerFuts", miniFuts,
             "super", super.toString());
     }
 
@@ -574,7 +608,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
      * Mini-future for get operations. Mini-futures are only waiting on a single
      * node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class MiniFuture {
         /** */
         private final int futId;
 
@@ -590,6 +624,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         @GridToStringInclude
         private ClusterNode node;
 
+        /** */
+        private volatile boolean done;
+
         /**
          * @param futId Future ID.
          * @param node Node.
@@ -634,14 +671,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
 
             // Fail.
-            onDone(e);
-        }
-
-        /**
-         * @param e Node failure.
-         */
-        void onNodeLeft(ClusterTopologyCheckedException e) {
-            onNodeLeft();
+            GridDhtTxFinishFuture.this.onDone(e);
         }
 
         /**
@@ -654,7 +684,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             }
 
             // If node left, then there is nothing to commit on it.
-            onDone(tx);
+            onDone();
         }
 
         /**
@@ -667,6 +697,33 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             onDone();
         }
 
+        /**
+         * @return {@code True} if done.
+         */
+        public boolean isDone() {
+            return done;
+        }
+
+        /**
+         *
+         */
+        public void onDone() {
+            boolean complete;
+
+            synchronized (GridDhtTxFinishFuture.this) {
+                if (done)
+                    return;
+
+                done = true;
+
+                complete = (++completedMiniCnt == miniFuts.size()) && inited;
+            }
+
+            if (complete)
+                GridDhtTxFinishFuture.this.onDone(tx);
+        }
+
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(),
"err", error());

http://git-wip-us.apache.org/repos/asf/ignite/blob/dd1310f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 02df645..ec2bd06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -195,6 +194,15 @@ public final class GridDhtTxPrepareFuture
     /** Timeout object. */
     private final PrepareTimeoutObject timeoutObj;
 
+    /** */
+    private List<MiniFuture> miniFuts;
+
+    /** */
+    private int completedMiniCnt;
+
+    /** */
+    private boolean inited;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -238,6 +246,37 @@ public final class GridDhtTxPrepareFuture
         timeoutObj = timeout > 0 ? new PrepareTimeoutObject(timeout) : null;
     }
 
+    /**
+     * Marks future as initialized.
+     */
+    private void markInitialized() {
+        boolean completed;
+
+        synchronized (this) {
+            if (inited)
+                return;
+
+            inited = true;
+
+            completed = miniFuts == null || completedMiniCnt == miniFuts.size();
+        }
+
+        if (completed)
+            onDone();
+    }
+
+    /**
+     * @param fut Mini future to add.
+     */
+    private synchronized void add(MiniFuture fut) {
+        if (miniFuts == null)
+            miniFuts = new ArrayList<>();
+
+        assert fut.futureId() == miniFuts.size();
+
+        miniFuts.add(fut);
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
@@ -300,16 +339,22 @@ public final class GridDhtTxPrepareFuture
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteInternalFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+        List<MiniFuture> miniFuts0;
 
-                if (f.node().id().equals(nodeId)) {
-                    f.onNodeLeft();
+        synchronized (this) {
+            if (miniFuts == null)
+                return false;
 
-                    return true;
-                }
+            miniFuts0 = new ArrayList<>(miniFuts);
+        }
+
+        for (MiniFuture miniFut : miniFuts0) {
+            if (miniFut.node().id().equals(nodeId)) {
+                miniFut.onNodeLeft();
+
+                return true;
             }
+        }
 
         return false;
     }
@@ -354,7 +399,8 @@ public final class GridDhtTxPrepareFuture
 
                 if (readOld) {
                     boolean readThrough = !txEntry.skipStore() &&
-                        (txEntry.op() == TRANSFORM || ((retVal || hasFilters) &&
cacheCtx.config().isLoadPreviousValue()));
+                        (txEntry.op() == TRANSFORM || ((retVal || hasFilters) &&
+                            cacheCtx.config().isLoadPreviousValue()));
 
                     boolean evt = retVal || txEntry.op() == TRANSFORM;
 
@@ -519,11 +565,8 @@ public final class GridDhtTxPrepareFuture
 
         MiniFuture mini = miniFuture(res.miniId());
 
-        if (mini != null) {
-            assert mini.node().id().equals(nodeId);
-
+        if (mini != null)
             mini.onResult(res);
-        }
         else {
             if (msgLog.isDebugEnabled()) {
                 msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion()
+
@@ -543,26 +586,9 @@ public final class GridDhtTxPrepareFuture
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(int miniId) {
-        // We iterate directly over the futs collection here to avoid copy.
         synchronized (this) {
-            int size = futuresCountNoLock();
-
-            // Avoid iterator creation.
-            for (int i = 0; i < size; i++) {
-                IgniteInternalFuture<IgniteInternalTx> fut = future(i);
-
-                if (!isMini(fut))
-                    continue;
-
-                MiniFuture mini = (MiniFuture)fut;
-
-                if (mini.futureId() == miniId) {
-                    if (!mini.isDone())
-                        return mini;
-                    else
-                        return null;
-                }
-            }
+            if (miniFuts != null && !miniFuts.get(miniId).isDone())
+                return miniFuts.get(miniId);
         }
 
         return null;
@@ -671,8 +697,8 @@ public final class GridDhtTxPrepareFuture
 
     /** {@inheritDoc} */
     @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) {
-        assert err != null || (initialized() && !hasPending()) : "On done called
for prepare future that has " +
-            "pending mini futures: " + this;
+// TODO        assert err != null || (initialized() && !hasPending()) : "On done
called for prepare future that has " +
+//            "pending mini futures: " + this;
 
         ERR_UPD.compareAndSet(this, null, err);
 
@@ -926,20 +952,11 @@ public final class GridDhtTxPrepareFuture
     }
 
     /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
      * Completeness callback.
      *
      * @param res Response.
-     * @return {@code True} if {@code done} flag was changed as a result of this call.
      */
-    private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
+    private void onComplete(@Nullable GridNearTxPrepareResponse res) {
         if (last || tx.isSystemInvalidate())
             tx.state(PREPARED);
 
@@ -949,11 +966,7 @@ public final class GridDhtTxPrepareFuture
 
             if (timeoutObj != null)
                 cctx.time().removeTimeoutObject(timeoutObj);
-
-            return true;
         }
-
-        return false;
     }
 
     /**
@@ -1232,7 +1245,7 @@ public final class GridDhtTxPrepareFuture
                     if (tx.remainingTime() == -1)
                         return;
 
-                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+                    MiniFuture fut = new MiniFuture(n.id(), miniId++, dhtMapping, nearMapping);
 
                     add(fut); // Append new future.
 
@@ -1374,11 +1387,11 @@ public final class GridDhtTxPrepareFuture
                                         GridCacheMvccCandidate added = entry.cached().candidate(version());
 
                                         assert added != null : "Null candidate for non-group-lock
entry " +
-                                            "[added=" + added + ", entry=" + entry + ']';
+                                            "[added=null, entry=" + entry + ']';
                                         assert added.dhtLocal() : "Got non-dht-local candidate
for prepare future" +
                                             "[added=" + added + ", entry=" + entry + ']';
 
-                                        if (added != null && added.ownerVersion()
!= null)
+                                        if (added.ownerVersion() != null)
                                             req.owned(entry.txKey(), added.ownerVersion());
                                     }
 
@@ -1544,18 +1557,10 @@ public final class GridDhtTxPrepareFuture
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>,
String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).node().id() +
-                    ", loc=" + ((MiniFuture)f).node().isLocal() +
-                    ", done=" + f.isDone() + "]";
-            }
-        });
-
+    @Override public synchronized String toString() {
         return S.toString(GridDhtTxPrepareFuture.class, this,
             "xid", tx.xidVersion(),
-            "innerFuts", futs,
+            "innerFuts", miniFuts,
             "super", super.toString());
     }
 
@@ -1578,6 +1583,9 @@ public final class GridDhtTxPrepareFuture
         @GridToStringInclude
         private GridDistributedTxMapping nearMapping;
 
+        /** */
+        private volatile boolean done;
+
         /**
          * @param nodeId Node ID.
          * @param futId Future ID.
@@ -1620,7 +1628,7 @@ public final class GridDhtTxPrepareFuture
                 log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
 
             // Fail.
-            onDone(e);
+            GridDhtTxPrepareFuture.this.onDone(e);
         }
 
         /**
@@ -1635,7 +1643,7 @@ public final class GridDhtTxPrepareFuture
             if (tx != null)
                 tx.removeMapping(nodeId);
 
-            onDone(tx);
+            onDone();
         }
 
         /**
@@ -1752,7 +1760,7 @@ public final class GridDhtTxPrepareFuture
                         }
                         catch (IgniteCheckedException e) {
                             // Fail the whole thing.
-                            onDone(e);
+                            GridDhtTxPrepareFuture.this.onDone(e);
 
                             return;
                         }
@@ -1765,13 +1773,42 @@ public final class GridDhtTxPrepareFuture
                 }
 
                 // Finish mini future.
-                onDone(tx);
+                onDone();
             }
         }
 
+        /**
+         * @return {@code True} if done.
+         */
+        public boolean isDone() {
+            return done;
+        }
+
+        /**
+         *
+         */
+        public void onDone() {
+            boolean complete;
+
+            synchronized (GridDhtTxPrepareFuture.this) {
+                if (done)
+                    return;
+
+                done = true;
+
+                complete = (++completedMiniCnt == miniFuts.size()) && inited;
+            }
+
+            if (complete)
+                GridDhtTxPrepareFuture.this.onDone();
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(),
"err", error());
+            return S.toString(MiniFuture.class, this,
+                "done", isDone(),
+                "cancelled", isCancelled(),
+                "err", error());
         }
     }
 


Mime
View raw message