ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3478
Date Mon, 04 Sep 2017 09:50:05 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 fce2e31f0 -> d3c049952


ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3c04995
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3c04995
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3c04995

Branch: refs/heads/ignite-3478
Commit: d3c049952384750c5543a9f88b383c033ef74096
Parents: fce2e31
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Sep 4 11:52:11 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Sep 4 12:49:57 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCacheSharedContext.java           | 14 +++---
 .../distributed/dht/GridDhtTxPrepareFuture.java | 10 ++--
 ...arOptimisticSerializableTxPrepareFuture.java |  8 +--
 .../near/GridNearOptimisticTxPrepareFuture.java | 16 +++---
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  4 +-
 .../GridNearPessimisticTxPrepareFuture.java     | 52 ++++++++++++++------
 .../near/GridNearTxFinishFuture.java            |  1 +
 .../near/GridNearTxFinishRequest.java           | 30 ++++++++++-
 .../near/GridNearTxPrepareFutureAdapter.java    |  8 +--
 .../near/GridNearTxPrepareResponse.java         | 32 ++++++++----
 .../mvcc/CacheCoordinatorsSharedManager.java    |  8 ++-
 .../cache/transactions/IgniteTxHandler.java     |  5 +-
 12 files changed, 131 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 09c8b1a..3919718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -124,7 +124,7 @@ public class GridCacheSharedContext<K, V> {
     private GridCacheSharedTtlCleanupManager ttlMgr;
 
     /** Cache mvcc coordinator. */
-    private CacheCoordinatorsSharedManager coord;
+    private CacheCoordinatorsSharedManager crd;
 
     /** Cache contexts map. */
     private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap;
@@ -167,7 +167,7 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param kernalCtx  Context.
-     * @param coord Cache mvcc coordinator manager.
+     * @param crd Cache mvcc coordinator manager.
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
@@ -181,7 +181,7 @@ public class GridCacheSharedContext<K, V> {
      */
     public GridCacheSharedContext(
         GridKernalContext kernalCtx,
-        CacheCoordinatorsSharedManager coord,
+        CacheCoordinatorsSharedManager crd,
         IgniteTxManager txMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
@@ -200,7 +200,7 @@ public class GridCacheSharedContext<K, V> {
         this.kernalCtx = kernalCtx;
 
         setManagers(mgrs,
-            coord,
+            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -356,7 +356,7 @@ public class GridCacheSharedContext<K, V> {
         List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
 
         setManagers(mgrs,
-            coord,
+            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -422,7 +422,7 @@ public class GridCacheSharedContext<K, V> {
         CacheAffinitySharedManager affMgr,
         GridCacheIoManager ioMgr,
         GridCacheSharedTtlCleanupManager ttlMgr) {
-        this.coord = add(mgrs, coord);
+        this.crd = add(mgrs, coord);
         this.mvccMgr = add(mgrs, mvccMgr);
         this.verMgr = add(mgrs, verMgr);
         this.txMgr = add(mgrs, txMgr);
@@ -766,7 +766,7 @@ public class GridCacheSharedContext<K, V> {
      * @return Cache mvcc coordinator manager.
      */
     public CacheCoordinatorsSharedManager coordinators() {
-        return coord;
+        return crd;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a3d67d2..812b576 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1223,7 +1223,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 }
             }
 
-            IgniteInternalFuture<Long> waitCoordCntrFut = null;
+            IgniteInternalFuture<Long> waitCrdCntrFut = null;
 
             if (req.requestMvccCounter()) {
                 assert tx.txState().mvccEnabled(cctx);
@@ -1235,10 +1235,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 if (crd.isLocal())
                     tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion()));
                 else {
-                    IgniteInternalFuture<Long> coordCntrFut = cctx.coordinators().requestTxCounter(crd,
tx);
+                    IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd,
tx);
 
                     if (tx.onePhaseCommit())
-                        waitCoordCntrFut = coordCntrFut;
+                        waitCrdCntrFut = crdCntrFut;
                 }
             }
 
@@ -1263,10 +1263,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 return;
 
             if (last) {
-                if (waitCoordCntrFut != null) {
+                if (waitCrdCntrFut != null) {
                     skipInit = true;
 
-                    waitCoordCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>()
{
+                    waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>()
{
                         @Override public void apply(IgniteInternalFuture<Long> fut)
{
                             try {
                                 fut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 69d0940..16535ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -234,7 +234,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             // Avoid iterator creation.
             for (int i = 0; i < size; i++) {
-                IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);
+                IgniteInternalFuture fut = future(i);
 
                 if (!isMini(fut))
                     continue;
@@ -382,14 +382,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             MiniFuture fut = new MiniFuture(this, m, ++miniId);
 
-            add(fut);
+            add((IgniteInternalFuture)fut);
 
             if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries())
{
                 assert locNearEntriesFut == null;
 
                 locNearEntriesFut = fut;
 
-                add(new MiniFuture(this, m, ++miniId));
+                add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
             }
         }
 
@@ -646,7 +646,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 if (keyLockFut == null) {
                     keyLockFut = new KeyLockFuture();
 
-                    add(keyLockFut);
+                    add((IgniteInternalFuture)keyLockFut);
                 }
 
                 keyLockFut.addLockKey(entry.txKey());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2c23a7a..093ab66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -220,7 +220,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             int size = futuresCountNoLock();
 
             for (int i = 0; i < size; i++) {
-                IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);
+                IgniteInternalFuture fut = future(i);
 
                 if (isMini(fut) && !fut.isDone()) {
                     MiniFuture miniFut = (MiniFuture)fut;
@@ -254,7 +254,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
             // Avoid iterator creation.
             for (int i = size - 1; i >= 0; i--) {
-                IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);
+                IgniteInternalFuture fut = future(i);
 
                 if (!isMini(fut))
                     continue;
@@ -564,7 +564,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
                 req.miniId(fut.futureId());
 
-                add(fut); // Append new future.
+                add((IgniteInternalFuture)fut); // Append new future.
 
                 if (n.isLocal()) {
                     assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries())
: m;
@@ -681,7 +681,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 if (keyLockFut == null) {
                     keyLockFut = new KeyLockFuture();
 
-                    add(keyLockFut);
+                    add((IgniteInternalFuture)keyLockFut);
                 }
 
                 keyLockFut.addLockKey(entry.txKey());
@@ -740,7 +740,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     int size = futuresCountNoLock();
 
                     for (int i = 0; i < size; i++) {
-                        IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);
+                        IgniteInternalFuture fut = future(i);
 
                         if (isMini(fut) && !fut.isDone()) {
                             MiniFuture miniFut = (MiniFuture)fut;
@@ -758,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
             }
 
-            add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception,
GridNearTxPrepareResponse>() {
+            add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception,
Object>() {
                 @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception
e) {
                     if (e != null)
                         U.warn(log, "Failed to detect deadlock.", e);
@@ -790,8 +790,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     ", loc=" + ((MiniFuture)f).node().isLocal() +
                     ", done=" + f.isDone() + "]";
             }
-        }, new P1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-            @Override public boolean apply(IgniteInternalFuture<GridNearTxPrepareResponse>
fut) {
+        }, new P1<IgniteInternalFuture<Object>>() {
+            @Override public boolean apply(IgniteInternalFuture<Object> fut) {
                 return isMini(fut);
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index f09b6c8..2e33889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -171,7 +171,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends
GridNearT
     /**
      * Keys lock future.
      */
-    protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
+    protected static class KeyLockFuture extends GridFutureAdapter<Void> {
         /** */
         @GridToStringInclude
         protected Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
@@ -216,7 +216,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends
GridNearT
                 if (log.isDebugEnabled())
                     log.debug("All locks are acquired for near prepare future: " + this);
 
-                onDone((GridNearTxPrepareResponse)null);
+                onDone((Void)null);
             }
             else {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 0cccce3..0559ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -46,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -139,13 +140,17 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
             // Avoid iterator creation.
             for (int i = 0; i < size; i++) {
-                MiniFuture mini = (MiniFuture)future(i);
+                IgniteInternalFuture fut = future(i);
 
-                if (mini.futureId() == miniId) {
-                    if (!mini.isDone())
-                        return mini;
-                    else
-                        return null;
+                if (fut instanceof MiniFuture) {
+                    MiniFuture mini = (MiniFuture)fut;
+
+                    if (mini.futureId() == miniId) {
+                        if (!mini.isDone())
+                            return mini;
+                        else
+                            return null;
+                    }
                 }
             }
         }
@@ -237,7 +242,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         req.miniId(fut.futureId());
 
-        add(fut);
+        add((IgniteInternalFuture)fut);
 
         IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
             cctx.tm().txHandler().prepareNearTxLocal(req) :
@@ -390,7 +395,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
                 req.miniId(fut.futureId());
 
-                add(fut);
+                add((IgniteInternalFuture)fut);
 
                 try {
                     cctx.io().send(primary, req, tx.ioPolicy());
@@ -421,9 +426,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         if (mvccCrd != null) {
             assert !tx.onePhaseCommit();
 
-            IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd,
tx);
+            if (mvccCrd.isLocal())
+                tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion()));
+            else {
+                IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd,
tx);
 
-            add((IgniteInternalFuture)cntrFut);
+                add((IgniteInternalFuture)cntrFut);
+            }
         }
 
         markInitialized();
@@ -457,9 +466,21 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>,
String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).primary().id() +
-                    ", loc=" + ((MiniFuture)f).primary().isLocal() +
-                    ", done=" + f.isDone() + "]";
+                if (f instanceof MiniFuture) {
+                    return "[node=" + ((MiniFuture)f).primary().id() +
+                        ", loc=" + ((MiniFuture)f).primary().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else if (f instanceof CacheCoordinatorsSharedManager.MvccCounterFuture) {
+                    CacheCoordinatorsSharedManager.MvccCounterFuture crdFut =
+                        (CacheCoordinatorsSharedManager.MvccCounterFuture)f;
+
+                    return "[crdNode=" + crdFut.crd.id() +
+                        ", loc=" + crdFut.crd.isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else
+                    return f.toString();
             }
         });
 
@@ -509,6 +530,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (res.error() != null)
                 onError(res.error());
             else {
+                if (res.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA)
+                    tx.mvccCoordinatorCounter(res.mvccCoordinatorCounter());
+
                 onPrepareResponse(m, res, updateMapping);
 
                 onDone(res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index e093eeb..69598d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -714,6 +714,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             tx.size(),
             tx.subjectId(),
             tx.taskNameHash(),
+            tx.mvccCoordinatorCounter(),
             tx.activeCachesDeploymentEnabled()
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dc32263..eb6d580 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.lang.IgniteUuid;
@@ -42,6 +43,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
     /** Mini future ID. */
     private int miniId;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -87,6 +91,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash,
+        long mvccCrdCntr,
         boolean addDepInfo) {
         super(
             xidVer,
@@ -110,6 +115,15 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
 
         explicitLock(explicitLock);
         storeEnabled(storeEnabled);
+
+        this.mvccCrdCntr = mvccCrdCntr;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
     }
 
     /**
@@ -177,6 +191,12 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
 
                 writer.incrementState();
 
+            case 22:
+                if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -201,6 +221,14 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
 
                 reader.incrementState();
 
+            case 22:
+                mvccCrdCntr = reader.readLong("mvccCrdCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridNearTxFinishRequest.class);
@@ -213,7 +241,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 22;
+        return 23;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index a94d6fc..e8893af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
  * Common code for tx prepare in optimistic and pessimistic modes.
  */
 public abstract class GridNearTxPrepareFutureAdapter extends
-    GridCacheCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements
GridCacheMvccFuture<IgniteInternalTx> {
+    GridCacheCompoundFuture<Object, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx>
{
     /** Logger reference. */
     protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -58,9 +58,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends
         AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class,
"err");
 
     /** */
-    private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>
REDUCER =
-        new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() {
-            @Override public boolean collect(GridNearTxPrepareResponse e) {
+    private static final IgniteReducer<Object, IgniteInternalTx> REDUCER =
+        new IgniteReducer<Object, IgniteInternalTx>() {
+            @Override public boolean collect(Object e) {
                 return true;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 4233371..a23ae4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -407,30 +407,36 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeMessage("retVal", retVal))
+                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 19:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -493,7 +499,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 15:
-                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
+                mvccCrdCntr = reader.readLong("mvccCrdCntr");
 
                 if (!reader.isLastRead())
                     return false;
@@ -501,7 +507,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 16:
-                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
+                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -509,7 +515,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 17:
-                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -517,7 +523,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 18:
-                retVal = reader.readMessage("retVal");
+                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -525,6 +531,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 19:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -544,7 +558,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 20;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index ec29002..f3287af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -375,14 +375,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
                 log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
                     ", crd=" + newCrd + ']');
+
+                return;
             }
         }
+
+        assignHist.addAssignment(discoCache.version(), curCrd);
     }
 
     /**
      *
      */
-    private class MvccCounterFuture extends GridFutureAdapter<Long> {
+    public class MvccCounterFuture extends GridFutureAdapter<Long> {
         /** */
         private final Long id;
 
@@ -390,7 +394,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         private IgniteInternalTx tx;
 
         /** */
-        private final ClusterNode crd;
+        public final ClusterNode crd;
 
         /**
          * @param id Future ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index cac1069..1b31d76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -859,8 +859,11 @@ public class IgniteTxHandler {
         else
             tx = ctx.tm().tx(dhtVer);
 
-        if (tx != null)
+        if (tx != null) {
+            tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
+
             req.txState(tx.txState());
+        }
 
         if (tx == null && locTx != null && !req.commit()) {
             U.warn(log, "DHT local tx not found for near local tx rollback " +


Mime
View raw message