ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [36/38] ignite git commit: ignite-4768
Date Tue, 14 Mar 2017 08:00:55 GMT
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baeb2036
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baeb2036
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baeb2036

Branch: refs/heads/ignite-4768
Commit: baeb2036e4f29ef1336cbfe5d8f8fcac012cbfb4
Parents: df80606
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Mar 13 15:31:52 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Mar 13 18:45:44 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   1 +
 .../processors/cache/GridCacheIoManager.java    |   4 +-
 .../processors/cache/GridCacheMvccManager.java  |   7 ++
 .../distributed/GridCacheTxFinishSync.java      |   6 +-
 .../GridCacheTxRecoveryResponse.java            |   2 +
 .../GridDistributedTxFinishRequest.java         |   8 ++
 .../GridDistributedTxPrepareRequest.java        |  13 ++-
 .../GridDistributedTxPrepareResponse.java       |   1 +
 .../distributed/dht/GridDhtTxFinishFuture.java  |   7 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  64 ++----------
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  65 ++++++++++--
 .../distributed/dht/GridDhtTxPrepareFuture.java |  37 +++----
 ...arOptimisticSerializableTxPrepareFuture.java | 100 ++++++++++++++++---
 .../GridNearPessimisticTxPrepareFuture.java     |  12 +--
 .../near/GridNearTxFinishFuture.java            |   5 +-
 .../cache/distributed/near/GridNearTxLocal.java |  10 +-
 .../cache/transactions/IgniteTxHandler.java     |  19 ++--
 17 files changed, 215 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..0ee02fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1155,6 +1155,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
 
                             String msg = NL +
                                 "Metrics for local node (to disable set 'metricsLogFrequency'
to 0)" + NL +
+                                "Futures " + ctx.cache().context().mvcc().activeFuturesCount()+
NL +
                                 "    ^-- Node [id=" + id + ", name=" + name() + ", uptime="
+ getUpTimeFormatted() + "]" + NL +
                                 "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ",
CPUs=" + cpus + "]" + NL +
                                 "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg="
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2c255a5..e433825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
 
             unmarshall(nodeId, cacheMsg);
 
-            if (!cacheMsg.partitionExchangeMessage())
-                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+//            if (!cacheMsg.partitionExchangeMessage())
+//                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
 
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 4ec13fc..7d14fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -312,6 +312,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * TODO IGNITE-4768.
+     */
+    public int activeFuturesCount() {
+        return mvccFuts.size();
+    }
+
+    /**
      * @param leftNodeId Left node ID.
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
index 1e323d0..64d3122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
@@ -137,7 +137,7 @@ public class GridCacheTxFinishSync<K, V> {
         /**
          * @param nodeId Node ID request being sent to.
          */
-        public void onSend(UUID nodeId) {
+        void onSend(UUID nodeId) {
             TxFinishSync sync = nodeMap.get(nodeId);
 
             if (sync == null) {
@@ -169,7 +169,7 @@ public class GridCacheTxFinishSync<K, V> {
          * @param nodeId Node ID to wait ack from.
          * @return {@code null} if ack has been received or future that will be completed
when ack is received.
          */
-        public IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) {
+        IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) {
             TxFinishSync sync = nodeMap.get(nodeId);
 
             if (sync == null)
@@ -191,7 +191,7 @@ public class GridCacheTxFinishSync<K, V> {
         /**
          * @param nodeId Node ID response received from.
          */
-        public void onReceive(UUID nodeId) {
+        void onReceive(UUID nodeId) {
             TxFinishSync sync = nodeMap.get(nodeId);
 
             if (sync != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
index c087a3d..b5bb1b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -48,6 +49,7 @@ public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage
impl
 
     /** Transient TX state. */
     @GridDirectTransient
+    @GridToStringExclude
     private IgniteTxState txState;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index c794f96..03d16e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -110,6 +111,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage
i
 
     /** Transient TX state. */
     @GridDirectTransient
+    @GridToStringExclude
     private IgniteTxState txState;
 
     /**
@@ -571,7 +573,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage
i
 
     /** {@inheritDoc} */
     @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (dhtReplyNear())
+            appendFlag(flags, "dht2near");
+
         return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this,
+            "flags", flags.toString(),
             "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index c013c1a..1f06696 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -151,6 +151,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
     /** Transient TX state. */
     @GridDirectTransient
+    @GridToStringExclude
     private IgniteTxState txState;
 
     /** */
@@ -699,15 +700,17 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         StringBuilder flags = new StringBuilder();
 
         if (needReturnValue())
-            flags.append("retVal");
+            appendFlag(flags, "retVal");
         if (isInvalidate())
-            flags.append("invalidate");
+            appendFlag(flags, "invalidate");
         if (onePhaseCommit())
-            flags.append("onePhase");
+            appendFlag(flags, "onePhase");
         if (last())
-            flags.append("last");
+            appendFlag(flags, "last");
         if (system())
-            flags.append("sys");
+            appendFlag(flags, "sys");
+        if (dhtReplyNear())
+            appendFlag(flags, "dht2near");
 
         return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
             "flags", flags.toString(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 53a1391..aaa8db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -49,6 +49,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
 
     /** Transient TX state. */
     @GridDirectTransient
+    @GridToStringExclude
     private IgniteTxState txState;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 72a9b73..eb5d58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -100,6 +100,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
     public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter
tx, boolean commit) {
         super(F.<IgniteInternalTx>identityReducer(tx));
 
+        assert tx.nearFinishFutureId() != null : tx;
+        assert tx.nearFinishMiniId() != 0 : tx;
+
         this.cctx = cctx;
         this.tx = tx;
         this.commit = commit;
@@ -424,8 +427,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
-                dhtReplyNear ? tx.nearFutureId() : futId,
-                dhtReplyNear ? 0 : fut.futureId(),
+                dhtReplyNear ? tx.nearFinishFutureId() : futId,
+                dhtReplyNear ? tx.nearFinishMiniId() : fut.futureId(),
                 tx.topologyVersion(),
                 tx.xidVersion(),
                 tx.commitVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index bff69bc..7ddf415 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -68,18 +68,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     /** */
     private UUID nearNodeId;
 
-    /** Near future ID. */
-    private IgniteUuid nearFutId;
-
-    /** Near future ID. */
-    private int nearMiniId;
-
-    /** Near future ID. */
-    private IgniteUuid nearFinFutId;
-
-    /** Near future ID. */
-    private int nearFinMiniId;
-
     /** Near XID. */
     private GridCacheVersion nearXidVer;
 
@@ -164,8 +152,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         this.nearNodeId = nearNodeId;
         this.nearXidVer = nearXidVer;
-        this.nearFutId = nearFutId;
-        this.nearMiniId = nearMiniId;
+        this.nearPrepFutId = nearFutId;
+        this.nearPrepMiniId = nearMiniId;
         this.txNodes = txNodes;
 
         threadId = nearThreadId;
@@ -219,18 +207,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteUuid nearFutureId() {
-        return nearFutId;
-    }
-
-    /**
-     * @param nearFutId Near future ID.
-     */
-    public void nearFutureId(IgniteUuid nearFutId) {
-        this.nearFutId = nearFutId;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean dht() {
         return true;
     }
@@ -240,27 +216,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
         return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId());
     }
 
-    /**
-     * @return Near future ID.
-     */
-    public IgniteUuid nearFinishFutureId() {
-        return nearFinFutId;
-    }
-
-    /**
-     * @param nearFinFutId Near future ID.
-     */
-    public void nearFinishFutureId(IgniteUuid nearFinFutId) {
-        this.nearFinFutId = nearFinFutId;
-    }
-
-    /**
-     * @param nearFinMiniId Near future mini ID.
-     */
-    public void nearFinishMiniId(int nearFinMiniId) {
-        this.nearFinMiniId = nearFinMiniId;
-    }
-
     /** {@inheritDoc} */
     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId,
GridDhtCacheEntry cached,
         IgniteTxEntry entry, AffinityTopologyVersion topVer) {
@@ -301,7 +256,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 null,
                 Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
                 0,
-                nearMiniId,
                 null,
                 true);
         }
@@ -317,7 +271,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 cctx,
                 this,
                 timeout,
-                nearMiniId,
                 Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
                 true,
                 needReturnValue()))) {
@@ -377,7 +330,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @param writes Write entries.
      * @param verMap Version map.
      * @param msgId Message ID.
-     * @param nearMiniId Near mini future ID.
      * @param txNodes Transaction nodes mapping.
      * @param last {@code True} if this is last prepare request.
      * @return Future that will be completed when locks are acquired.
@@ -387,7 +339,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
         long msgId,
-        int nearMiniId,
         Map<UUID, Collection<UUID>> txNodes,
         boolean last
     ) {
@@ -404,14 +355,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
                 cctx,
                 this,
                 timeout,
-                nearMiniId,
                 verMap,
                 last,
                 needReturnValue()))) {
                 GridDhtTxPrepareFuture f = prepFut;
 
-                assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future
" +
-                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut="
+ f + ']';
+                assert f.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing
future " +
+                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut="
+ f + ']';
 
                 if (timeout == -1)
                     f.onError(timeoutException());
@@ -420,8 +370,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             }
         }
         else {
-            assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future
" +
-                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" +
fut + ']';
+            assert fut.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing future
" +
+                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut="
+ fut + ']';
 
             // Prepare was called explicitly.
             return chainOnePhasePrepare(fut);
@@ -619,7 +569,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate()
+ ", commit=" + commit +
             ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']';
 
-        assert nearMiniId != 0;
+        assert nearPrepMiniId != 0;
 
         return super.finish(commit);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 67e1993..81b5208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -97,6 +97,18 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter
{
     /** Nodes where transactions were started on lock step. */
     private Set<ClusterNode> lockTxNodes;
 
+    /** Near future ID. */
+    protected IgniteUuid nearPrepFutId;
+
+    /** Prepare future mini ID. */
+    protected int nearPrepMiniId;
+
+    /** Near future ID. */
+    protected IgniteUuid nearFinFutId;
+
+    /** Prepare future mini ID. */
+    protected int nearFinMiniId;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -159,9 +171,55 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter
{
     }
 
     /**
+     * @return Near future ID.
+     */
+    final IgniteUuid nearPrepareFutureId() {
+        return nearPrepFutId;
+    }
+
+    /**
+     * @param futId Near future ID.
+     * @param miniId Near mini future ID.
+     */
+    public final void nearPrepareFutureId(IgniteUuid futId, int miniId) {
+        this.nearPrepFutId = futId;
+        this.nearPrepMiniId = miniId;
+    }
+
+    /**
+     * @return Near prepare mini future ID.
+     */
+    final int nearPrepareMiniId() {
+        return nearPrepMiniId;
+    }
+
+    /**
+     * @return Near future ID.
+     */
+    final IgniteUuid nearFinishFutureId() {
+        return nearFinFutId;
+    }
+
+    /**
+     * @param futId Near future ID.
+     * @param miniId Near mini future ID.
+     */
+    public final void nearFinishFutureId(IgniteUuid futId, int miniId) {
+        nearFinFutId = futId;
+        nearFinMiniId = miniId;
+    }
+
+    /**
+     * @return Near future mini ID.
+     */
+    public final int nearFinishMiniId() {
+        return nearFinMiniId;
+    }
+
+    /**
      * @param node Node.
      */
-    public void addLockTransactionNode(ClusterNode node) {
+    void addLockTransactionNode(ClusterNode node) {
         assert node != null;
         assert !node.isLocal();
 
@@ -216,11 +274,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter
{
     protected abstract UUID nearNodeId();
 
     /**
-     * @return Near future ID.
-     */
-    protected abstract IgniteUuid nearFutureId();
-
-    /**
      * Adds reader to cached entry.
      *
      * @param msgId Message ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c8e06af..7c69021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -175,9 +175,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /** Trackable flag. */
     private boolean trackable = true;
 
-    /** Near mini future id. */
-    private int nearMiniId;
-
     /** DHT versions map. */
     private Map<IgniteTxKey, GridCacheVersion> dhtVerMap;
 
@@ -213,7 +210,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param cctx Context.
      * @param tx Transaction.
      * @param timeout Timeout.
-     * @param nearMiniId Near mini future id.
      * @param dhtVerMap DHT versions map.
      * @param last {@code True} if this is last prepare operation for node.
      * @param retVal Return value flag.
@@ -222,13 +218,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         GridCacheSharedContext cctx,
         final GridDhtTxLocalAdapter tx,
         long timeout,
-        int nearMiniId,
         Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
         boolean last,
         boolean retVal
     ) {
         super(REDUCER);
 
+        assert tx.nearPrepareFutureId() != null;
+        assert tx.nearPrepareMiniId() != 0;
+
         this.cctx = cctx;
         this.tx = tx;
         this.dhtVerMap = dhtVerMap;
@@ -236,8 +234,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         futId = IgniteUuid.randomUuid();
 
-        this.nearMiniId = nearMiniId;
-
         if (log == null) {
             msgLog = cctx.txPrepareMessageLogger();
             log = U.logger(cctx.kernalContext(), logRef, GridDhtTxPrepareFuture.class);
@@ -263,7 +259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @return Near mini future id.
      */
     int nearMiniId() {
-        return nearMiniId;
+        return tx.nearPrepareMiniId();
     }
 
     /** {@inheritDoc} */
@@ -860,8 +856,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
             -1,
             tx.nearXidVersion(),
-            tx.colocated() ? tx.xid() : tx.nearFutureId(),
-            nearMiniId,
+            tx.nearPrepareFutureId(),
+            nearMiniId(),
             tx.xidVersion(),
             tx.writeVersion(),
             ret,
@@ -1223,17 +1219,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
             final boolean dhtReplyNear = tx.dhtReplyNear();
 
-            Collection<UUID> backupNodes;
-            IgniteUuid nearFutId;
-
-            if (dhtReplyNear) {
-                backupNodes = tx.transactionNodes().get(cctx.localNodeId());
-                nearFutId = tx.colocated() ? tx.xid() : tx.nearFutureId();
-            }
-            else {
-                backupNodes = null;
-                nearFutId = null;
-            }
+            Collection<UUID> backupNodes = dhtReplyNear ? tx.transactionNodes().get(cctx.localNodeId())
: null;
 
             // Assign keys to primary nodes.
             if (!F.isEmpty(writes)) {
@@ -1285,8 +1271,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     assert txNodes != null;
 
                     GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                        dhtReplyNear ? nearFutId : futId,
-                        dhtReplyNear ? nearMiniId : fut.futureId(),
+                        dhtReplyNear ? tx.nearPrepareFutureId() : futId,
+                        dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(),
                         tx.topologyVersion(),
                         tx,
                         timeout,
@@ -1403,8 +1389,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         add(fut); // Append new future.
 
                         GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                            dhtReplyNear ? nearFutId : futId,
-                            dhtReplyNear ? nearMiniId : fut.futureId(),
+                            dhtReplyNear ? tx.nearPrepareFutureId() : futId,
+                            dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(),
                             tx.topologyVersion(),
                             tx,
                             timeout,
@@ -1503,6 +1489,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param entry Transaction entry.
+     * @param backupNodes Node IDs collection if tx was mapped on near node.
      */
     private void map(IgniteTxEntry entry, @Nullable Collection<UUID> backupNodes) {
         if (entry.cached().isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cd0e7fd..6b2daf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
@@ -36,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -200,13 +203,18 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             MiniFuture mini = miniFuture(res.miniId());
 
             if (mini != null)
-                mini.onResult(res);
+                mini.onPrimaryResponse(res);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
-        assert false; // TODO IGNITE-4768.
+        assert res.nearNodeResponse() : res;
+
+        MiniFuture mini = miniFuture(res.miniId());
+
+        if (mini != null)
+            mini.onDhtResponse(nodeId, res);
     }
 
     /** {@inheritDoc} */
@@ -347,11 +355,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings
= new HashMap<>();
 
+        boolean dhtReplyNear = true;
+
         for (IgniteTxEntry write : writes)
-            map(write, topVer, mappings, txMapping, remap, topLocked);
+            dhtReplyNear = map(write, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear);
 
         for (IgniteTxEntry read : reads)
-            map(read, topVer, mappings, txMapping, remap, topLocked);
+            dhtReplyNear = map(read, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear);
 
         if (keyLockFut != null)
             keyLockFut.onAllKeysAdded();
@@ -371,10 +381,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         checkOnePhase(txMapping);
 
+        if (tx.onePhaseCommit())
+            dhtReplyNear = false;
+
+        tx.dhtReplyNear(dhtReplyNear);
+
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            add(new MiniFuture(this, m, ++miniId));
+            Set<UUID> dhtNodes;
+
+            if (dhtReplyNear) {
+                dhtNodes = new HashSet<>(txMapping.transactionNodes().get(m.primary().id()));
+
+                assert !dhtNodes.isEmpty();
+            }
+            else
+                dhtNodes = null;
+
+            add(new MiniFuture(this, m, ++miniId, dhtNodes));
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -389,7 +414,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             MiniFuture fut = (MiniFuture)fut0;
 
-            IgniteCheckedException err = prepare(fut, txMapping);
+            IgniteCheckedException err = prepare(fut, txMapping, dhtReplyNear);
 
             if (err != null) {
                 while (it.hasNext()) {
@@ -425,7 +450,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param fut Mini future.
      * @return Prepare error if any.
      */
-    @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping
txMapping) {
+    @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+        GridDhtTxMapping txMapping,
+        boolean dhtReplyNear) {
         GridDistributedTxMapping m = fut.mapping();
 
         final ClusterNode primary = m.primary();
@@ -449,7 +476,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             m.writes(),
             m.near(),
             txMapping.transactionNodes(),
-            false,
+            dhtReplyNear,
             m.last(),
             tx.onePhaseCommit(),
             tx.needReturnValue() && tx.implicit(),
@@ -490,7 +517,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>()
{
                 @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse>
prepFut) {
                     try {
-                        fut.onResult(prepFut.get());
+                        fut.onPrimaryResponse(prepFut.get());
                     }
                     catch (IgniteCheckedException e) {
                         fut.onResult(e);
@@ -526,19 +553,30 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param remap Remap flag.
      * @param topLocked Topology locked flag.
      */
-    private void map(
+    private boolean map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
         GridDhtTxMapping txMapping,
         boolean remap,
-        boolean topLocked
+        boolean topLocked,
+        boolean dhtReplyNear
     ) {
         GridCacheContext cacheCtx = entry.context();
 
-        List<ClusterNode> nodes = cacheCtx.isLocal() ?
-            cacheCtx.affinity().nodesByKey(entry.key(), topVer) :
-            cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
+        List<ClusterNode> nodes;
+
+        if (!cacheCtx.isLocal()) {
+            GridDhtPartitionTopology top = cacheCtx.topology();
+
+            nodes = top.nodes(cacheCtx.affinity().partition(entry.key()), topVer);
+
+            if (dhtReplyNear &&
+                (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(),
topVer) || nodes.size() == 1))
+                dhtReplyNear = false;
+        }
+        else
+            nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()),
topVer);
 
         txMapping.addMapping(nodes);
 
@@ -620,6 +658,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 }
             }
         }
+
+        return dhtReplyNear;
     }
 
     /** {@inheritDoc} */
@@ -711,15 +751,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         @SuppressWarnings("UnusedDeclaration")
         private volatile int rcvRes;
 
+        /** */
+        private final Set<UUID> dhtNodes;
+
         /**
          * @param parent Parent future.
          * @param m Mapping.
          * @param futId Mini future ID.
          */
-        MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping
m, int futId) {
+        MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent,
+            GridDistributedTxMapping m,
+            int futId,
+            Set<UUID> dhtNodes) {
+            assert dhtNodes == null || !dhtNodes.isEmpty();
+
             this.parent = parent;
             this.m = m;
             this.futId = futId;
+            this.dhtNodes = dhtNodes;
         }
 
         /**
@@ -779,10 +828,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         }
 
         /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         */
+        void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
+            assert dhtNodes != null;
+
+            boolean done;
+
+            synchronized (dhtNodes) {
+                done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+            }
+
+            if (done)
+                onDone();
+        }
+
+        /**
          * @param res Result callback.
          */
         @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-        void onResult(final GridNearTxPrepareResponse res) {
+        void onPrimaryResponse(final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
@@ -885,6 +951,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
+                        assert dhtNodes == null;
+
                         parent.processPrimaryPrepareResponse(m, res);
 
                         // Finish this mini future (need result only on client node).

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 1d3eaec..857f237 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -104,7 +104,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (f != null) {
                 assert f.primary().id().equals(nodeId);
 
-                f.onPrimaryResult(res);
+                f.onPrimaryResponse(res);
             }
             else {
                 if (msgLog.isDebugEnabled()) {
@@ -223,12 +223,10 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (!cacheCtx.isLocal()) {
                 GridDhtPartitionTopology top = cacheCtx.topology();
 
-                if (dhtReplyNear && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(),
topVer)))
-                    dhtReplyNear = false;
-
                 nodes = top.nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
 
-                if (nodes.size() == 1)
+                if (dhtReplyNear &&
+                    (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(),
topVer) || nodes.size() == 1))
                     dhtReplyNear = false;
             }
             else
@@ -319,7 +317,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>()
{
                     @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse>
prepFut) {
                         try {
-                            fut.onPrimaryResult(prepFut.get());
+                            fut.onPrimaryResponse(prepFut.get());
                         }
                         catch (IgniteCheckedException e) {
                             fut.onError(e);
@@ -466,7 +464,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         /**
          * @param res Response.
          */
-        void onPrimaryResult(GridNearTxPrepareResponse res) {
+        void onPrimaryResponse(GridNearTxPrepareResponse res) {
             if (res.error() != null)
                 onError(res.error());
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 44455ca..56b7284 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -1029,8 +1029,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
             }
 
-            if (done)
+            if (done) {
+                cctx.tm().onFinishedRemote(primary().id(), tx.threadId());
+
                 onDone(tx);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ed749c..80b93e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -214,13 +214,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteUuid nearFutureId() {
-        assert false : "nearFutureId should not be called for colocated transactions.";
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<Boolean> addReader(
         long msgId,
         GridDhtCacheEntry cached,
@@ -1002,7 +995,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             cctx,
             this,
             timeout,
-            0,
             Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
             last,
             needReturnValue() && implicit());
@@ -1065,7 +1057,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         // Do not create finish future if there are no remote nodes.
         if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
             if (prep != null)
-                return (IgniteInternalFuture<IgniteInternalTx>)(IgniteInternalFuture)prep;
+                return (IgniteInternalFuture<IgniteInternalTx>)prep;
 
             return new GridFinishedFuture<IgniteInternalTx>(this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 71b847a..aad0e34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -356,10 +356,6 @@ public class IgniteTxHandler {
             if (tx == null)
                 U.warn(log, "Missing local transaction for mapped near version [nearVer="
+ req.version()
                     + ", mappedVer=" + mappedVer + ']');
-            else {
-                if (req.concurrency() == PESSIMISTIC)
-                    tx.nearFutureId(req.futureId());
-            }
         }
         else {
             GridDhtPartitionTopology top = null;
@@ -470,6 +466,7 @@ public class IgniteTxHandler {
                 tx.explicitLock(true);
 
             tx.transactionNodes(req.transactionNodes());
+            tx.nearPrepareFutureId(req.futureId(), req.miniId());
 
             tx.dhtReplyNear(req.dhtReplyNear());
 
@@ -490,7 +487,6 @@ public class IgniteTxHandler {
                 req.writes(),
                 req.dhtVersions(),
                 req.messageId(),
-                req.miniId(),
                 req.transactionNodes(),
                 req.last());
 
@@ -771,8 +767,10 @@ public class IgniteTxHandler {
      * @param req Finish request.
      * @return Finish future.
      */
-    private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, @Nullable
GridNearTxLocal locTx,
-        GridNearTxFinishRequest req) {
+    private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId,
+        @Nullable GridNearTxLocal locTx,
+        GridNearTxFinishRequest req)
+    {
         GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
 
         GridDhtTxLocal tx = null;
@@ -853,6 +851,7 @@ public class IgniteTxHandler {
             assert req.syncMode() != null : req;
 
             tx.syncMode(req.syncMode());
+            tx.nearFinishFutureId(req.futureId(), req.miniId());
 
             if (req.commit()) {
                 tx.storeEnabled(req.storeEnabled());
@@ -864,9 +863,6 @@ public class IgniteTxHandler {
                     return null;
                 }
 
-                tx.nearFinishFutureId(req.futureId());
-                tx.nearFinishMiniId(req.miniId());
-
                 IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync();
 
                 // Only for error logging.
@@ -875,9 +871,6 @@ public class IgniteTxHandler {
                 return commitFut;
             }
             else {
-                tx.nearFinishFutureId(req.futureId());
-                tx.nearFinishMiniId(req.miniId());
-
                 IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
 
                 // Only for error logging.


Mime
View raw message