ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-3478
Date Mon, 04 Sep 2017 14:42:43 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 d3c049952 -> e71ce1937


http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 812b576..6366b70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -393,7 +393,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                         entryProc,
                         tx.resolveTaskName(),
                         null,
-                        keepBinary);
+                        keepBinary,
+                        TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
 
                     if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
@@ -493,7 +494,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                             /*transformClo*/null,
                             /*taskName*/null,
                             /*expiryPlc*/null,
-                            /*keepBinary*/true);
+                            /*keepBinary*/true,
+                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                     }
 
                     if (oldVal != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index e7e0e06..066a706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,6 +78,12 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private ClusterNode mvccCrd;
+
+    /** */
+    private long mvccCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -135,17 +143,49 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
         if (lockedTopVer != null) {
-            canRemap = false;
+            topVer = lockedTopVer;
 
-            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer);
+            canRemap = false;
         }
         else {
-            AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+            topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
                 canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+        }
+
+        // TODO IGNITE-3478 (correct failover and remap).
+        if (cctx.mvccEnabled()) {
+            mvccCrd = cctx.shared().coordinators().coordinator(topVer);
+
+            if (mvccCrd == null) {
+                onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+                return;
+            }
+
+            IgniteInternalFuture<Long> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 
-            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+            cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
+                @Override public void apply(IgniteInternalFuture<Long> fut) {
+                    try {
+                        mvccCntr = fut.get();
+
+                        map(keys,
+                            Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
+                            GridPartitionedGetFuture.this.topVer);
+
+                        markInitialized();
+                    }
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
+                    }
+                }
+            });
+
+            return;
         }
 
+        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+
         markInitialized();
     }
 
@@ -203,10 +243,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     /** {@inheritDoc} */
     @Override public boolean onDone(Map<K, V> res, Throwable err) {
         if (super.onDone(res, err)) {
-            // Don't forget to clean up.
             if (trackable)
                 cctx.mvcc().removeFuture(futId);
 
+            if (mvccCntr != TxMvccVersion.COUNTER_NA) {
+                assert mvccCrd != null;
+
+                cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccCntr);
+            }
+
             cache().sendTtlUpdateRequest(expiryPlc);
 
             return true;
@@ -299,7 +344,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                         taskName == null ? 0 : taskName.hashCode(),
                         expiryPlc,
                         skipVals,
-                        recovery);
+                        recovery,
+                        mvccCntr);
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -355,7 +401,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     false,
                     skipVals,
                     cctx.deploymentEnabled(),
-                    recovery);
+                    recovery,
+                    mvccCntr);
 
                 add(fut); // Append new future.
 
@@ -460,7 +507,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 GridCacheVersion ver = null;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(cctx, key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key, mvccCntr);
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -503,6 +550,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                                 taskName,
                                 expiryPlc,
                                 !deserializeBinary,
+                                mvccCntr,
                                 null);
 
                             if (getRes != null) {
@@ -521,7 +569,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                                 null,
                                 taskName,
                                 expiryPlc,
-                                !deserializeBinary);
+                                !deserializeBinary,
+                                mvccCntr);
                         }
 
                         cache.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 0828a80..9da6db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -356,7 +357,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(cctx, key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key, TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -399,7 +400,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null);
+                                TxMvccVersion.COUNTER_NA,
+                                null); // TODO IGNITE-3478
 
                             if (res != null) {
                                 v = res.value();
@@ -417,7 +419,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 null,
                                 taskName,
                                 expiryPlc,
-                                true);
+                                true,
+                                TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                         }
 
                         colocated.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7d0f747..ef1661c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -1494,7 +1495,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Optimistically expect that all keys are available locally (avoid creation of get future).
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(ctx, key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key, TxMvccVersion.COUNTER_NA);
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -1554,6 +1555,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                             taskName,
                                             expiry,
                                             true,
+                                            TxMvccVersion.COUNTER_NA,
                                             null);
 
                                         if (getRes != null) {
@@ -1572,7 +1574,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                             null,
                                             taskName,
                                             expiry,
-                                            !deserializeBinary);
+                                            !deserializeBinary,
+                                            TxMvccVersion.COUNTER_NA);
                                     }
 
                                     // Entry was not in memory or in swap, so we remove it from cache.
@@ -2097,7 +2100,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         entryProcessor,
                         taskName,
                         null,
-                        req.keepBinary());
+                        req.keepBinary(),
+                        TxMvccVersion.COUNTER_NA);
 
                     Object oldVal = null;
                     Object updatedVal = null;
@@ -2251,7 +2255,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             null,
                             taskName,
                             null,
-                            req.keepBinary());
+                            req.keepBinary(),
+                            TxMvccVersion.COUNTER_NA);
 
                         Object val = ctx.config().getInterceptor().onBeforePut(
                             new CacheLazyEntry(
@@ -2294,7 +2299,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             null,
                             taskName,
                             null,
-                            req.keepBinary());
+                            req.keepBinary(),
+                            TxMvccVersion.COUNTER_NA);
 
                         IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
                             .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 053bbe5..c54d91e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -452,7 +453,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(ctx, key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key, TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -515,7 +516,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null);
+                                            TxMvccVersion.COUNTER_NA,
+                                            null); // TODO IGNITE-3478
 
                                         if (getRes != null) {
                                             v = getRes.value();
@@ -533,7 +535,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             null,
                                             taskName,
                                             expiryPlc,
-                                            !deserializeBinary);
+                                            !deserializeBinary,
+                                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                                     }
 
                                     // Entry was not in memory or in swap, so we remove it from cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9d9c682..80e210c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistribu
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
@@ -321,7 +322,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         taskName == null ? 0 : taskName.hashCode(),
                         expiryPlc,
                         skipVals,
-                        recovery);
+                        recovery,
+                        TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -383,7 +385,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     true,
                     skipVals,
                     cctx.deploymentEnabled(),
-                    recovery);
+                    recovery,
+                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478.
 
                 add(fut); // Append new future.
 
@@ -454,7 +457,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             taskName,
                             expiryPlc,
                             !deserializeBinary,
-                            null);
+                            TxMvccVersion.COUNTER_NA,
+                            null); // TODO IGNITE-3478
 
                         if (res != null) {
                             v = res.value();
@@ -472,7 +476,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             null,
                             taskName,
                             expiryPlc,
-                            !deserializeBinary);
+                            !deserializeBinary,
+                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                     }
                 }
 
@@ -591,7 +596,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             taskName,
                             expiryPlc,
                             !deserializeBinary,
-                            null);
+                            TxMvccVersion.COUNTER_NA,
+                            null); // TODO IGNITE-3478
 
                         if (res != null) {
                             v = res.value();
@@ -609,7 +615,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             null,
                             taskName,
                             expiryPlc,
-                            !deserializeBinary);
+                            !deserializeBinary,
+                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                     }
 
                     // Entry was not in memory or in swap, so we remove it from cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 1bffac4..4f343a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -106,6 +107,9 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     /** TTL for read operation. */
     private long accessTtl;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -144,7 +148,8 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         boolean addReader,
         boolean skipVals,
         boolean addDepInfo,
-        boolean recovery
+        boolean recovery,
+        long mvccCrdCntr
     ) {
         assert futId != null;
         assert miniId != null;
@@ -173,6 +178,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
         this.addDepInfo = addDepInfo;
+        this.mvccCrdCntr = mvccCrdCntr;
 
         if (readThrough)
             flags |= READ_THROUGH_FLAG_MASK;
@@ -188,6 +194,13 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     }
 
     /**
+     * @return Counter.
+     */
+    public long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
+    }
+
+    /**
      * @return Future ID.
      */
     public IgniteUuid futureId() {
@@ -382,30 +395,36 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeCollection("readersFlags", readersFlags, MessageCollectionItemType.BOOLEAN))
+                if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("readersFlags", readersFlags, MessageCollectionItemType.BOOLEAN))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 13:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
                 if (!writer.writeMessage("ver", ver))
                     return false;
 
@@ -476,7 +495,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 9:
-                readersFlags = reader.readCollection("readersFlags", MessageCollectionItemType.BOOLEAN);
+                mvccCrdCntr = reader.readLong("mvccCrdCntr");
 
                 if (!reader.isLastRead())
                     return false;
@@ -484,7 +503,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 10:
-                subjId = reader.readUuid("subjId");
+                readersFlags = reader.readCollection("readersFlags", MessageCollectionItemType.BOOLEAN);
 
                 if (!reader.isLastRead())
                     return false;
@@ -492,7 +511,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 11:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -500,7 +519,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 12:
-                topVer = reader.readMessage("topVer");
+                taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -508,6 +527,14 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 13:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
                 ver = reader.readMessage("ver");
 
                 if (!reader.isLastRead())
@@ -527,7 +554,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
new file mode 100644
index 0000000..a399421
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ *
+ */
+public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture {
+    /** */
+    private final GridNearTxFinishFuture finishFut;
+
+    /**
+     * @param finishFut Finish future.
+     */
+    GridNearTxFinishAndAckFuture(GridNearTxFinishFuture finishFut) {
+        this.finishFut = finishFut;
+    }
+
+    /** {@inheritDoc} */
+    public void finish(boolean commit) {
+        if (commit) {
+            finishFut.finish(true);
+
+            finishFut.listen(new IgniteInClosure<GridNearTxFinishFuture>() {
+                @Override public void apply(final GridNearTxFinishFuture fut) {
+                    GridNearTxLocal tx = fut.tx();
+
+                    if (tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA) {
+                        ClusterNode crd = fut.context().coordinators().coordinator(tx.topologyVersion());
+
+                        assert crd != null;
+
+                        IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
+                            crd, tx.nearXidVersion());
+
+                        ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+                            @Override public void apply(IgniteInternalFuture<Void> ackFut) {
+                                Exception err = null;
+
+                                try {
+                                    fut.get();
+
+                                    ackFut.get();
+                                }
+                                catch (Exception e) {
+                                    err = e;
+                                }
+                                catch (Error e) {
+                                    onDone(e);
+
+                                    throw e;
+                                }
+
+                                if (err != null)
+                                    onDone(err);
+                                else
+                                    onDone(fut.tx());
+                            }
+                        });
+                    }
+                    else
+                        finishWithFutureResult(fut);
+                }
+            });
+        }
+        else {
+            finishFut.finish(false);
+
+            finishFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    finishWithFutureResult(fut);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private void finishWithFutureResult(IgniteInternalFuture<IgniteInternalTx> fut) {
+        try {
+            onDone(fut.get());
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            onDone(e);
+        }
+        catch (Error e) {
+            onDone(e);
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearTxFinishAndAckFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 69598d4..eb1f79f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -68,7 +68,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  *
  */
 public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheFuture<IgniteInternalTx> {
+    implements GridCacheFuture<IgniteInternalTx>, NearTxFinishFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -136,6 +136,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
         }
     }
 
+    /**
+     * @return Cache context.
+     */
+    GridCacheSharedContext<K, V> context() {
+        return cctx;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
@@ -383,13 +390,17 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
-    /**
-     * Initializes future.
-     *
-     * @param commit Commit flag.
-     */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    void finish(boolean commit) {
+    /** {@inheritDoc} */
+    public void finish(boolean commit) {
+        if (!commit && tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA) {
+            ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
+
+            assert crd != null;
+
+            cctx.coordinators().ackTxRollback(crd, tx.nearXidVersion());
+        }
+
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ecf21f..e390932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -126,8 +127,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut");
 
     /** Prepare future updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> COMMIT_FUT_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut");
+    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, NearTxFinishFuture> COMMIT_FUT_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, NearTxFinishFuture.class, "commitFut");
 
     /** Rollback future updater. */
     private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD =
@@ -144,7 +145,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     /** Commit future. */
     @SuppressWarnings("UnusedDeclaration")
     @GridToStringExclude
-    private volatile GridNearTxFinishFuture commitFut;
+    private volatile NearTxFinishFuture commitFut;
 
     /** Rollback future. */
     @SuppressWarnings("UnusedDeclaration")
@@ -1167,7 +1168,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                         resolveTaskName(),
                                         null,
                                         keepBinary,
-                                        null) : null;
+                                        TxMvccVersion.COUNTER_NA,
+                                        null) : null; // TODO IGNITE-3478
 
                                 if (res != null) {
                                     old = res.value();
@@ -1185,7 +1187,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     entryProcessor,
                                     resolveTaskName(),
                                     null,
-                                    keepBinary);
+                                    keepBinary,
+                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                             }
                         }
                         catch (ClusterTopologyCheckedException e) {
@@ -1770,7 +1773,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            null);
+                                            TxMvccVersion.COUNTER_NA,
+                                            null); // TODO IGNITE-3478
 
                                         if (getRes != null) {
                                             val = getRes.value();
@@ -1788,7 +1792,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                             transformClo,
                                             resolveTaskName(),
                                             null,
-                                            txEntry.keepBinary());
+                                            txEntry.keepBinary(),
+                                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                                     }
 
                                     // If value is in cache and passed the filter.
@@ -2064,7 +2069,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     resolveTaskName(),
                                     null,
                                     txEntry.keepBinary(),
-                                    null);
+                                    TxMvccVersion.COUNTER_NA,
+                                    null); // TODO IGNITE-3478
 
                                 if (getRes != null) {
                                     val = getRes.value();
@@ -2082,7 +2088,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     transformClo,
                                     resolveTaskName(),
                                     null,
-                                    txEntry.keepBinary());
+                                    txEntry.keepBinary(),
+                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2150,7 +2157,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                         resolveTaskName(),
                                         accessPlc,
                                         !deserializeBinary,
-                                        null) : null;
+                                        TxMvccVersion.COUNTER_NA,
+                                        null) : null; // TODO IGNITE-3478
 
                                 if (getRes != null) {
                                     val = getRes.value();
@@ -2168,7 +2176,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     null,
                                     resolveTaskName(),
                                     accessPlc,
-                                    !deserializeBinary);
+                                    !deserializeBinary,
+                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2636,7 +2645,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                             resolveTaskName(),
                             expiryPlc0,
                             txEntry == null ? keepBinary : txEntry.keepBinary(),
-                            null);
+                            TxMvccVersion.COUNTER_NA,
+                            null); // TODO IGNITE-3478
 
                         if (res == null) {
                             if (misses == null)
@@ -3206,17 +3216,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
         final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
 
-        GridNearTxFinishFuture fut = commitFut;
+        NearTxFinishFuture fut = commitFut;
 
-        if (fut == null &&
-            !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
+        if (fut != null)
+            return fut;
+
+        GridNearTxFinishFuture nearFinishFut = new GridNearTxFinishFuture<>(cctx, this, true);
+
+        fut = txState.mvccEnabled(cctx) ? new GridNearTxFinishAndAckFuture(nearFinishFut) : nearFinishFut;
+
+        if (!COMMIT_FUT_UPD.compareAndSet(this, null, fut))
             return commitFut;
 
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        cctx.mvcc().addFuture(nearFinishFut, nearFinishFut.futureId());
 
         prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(IgniteInternalFuture<?> f) {
-                GridNearTxFinishFuture fut0 = commitFut;
+                NearTxFinishFuture fut0 = commitFut;
 
                 try {
                     // Make sure that here are no exceptions.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
new file mode 100644
index 0000000..94224ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+
+/**
+ *
+ */
+public interface NearTxFinishFuture extends IgniteInternalFuture<IgniteInternalTx>  {
+    /**
+     * @param commit {@code True} to commit, otherwise rollback.
+     */
+    public void finish(boolean commit);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 4eacfb8..f2498d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -403,7 +404,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
             boolean skipEntry = readNoEntry;
 
             if (readNoEntry) {
-                CacheDataRow row = ctx.offheap().read(ctx, cacheKey);
+                CacheDataRow row = ctx.offheap().read(ctx, cacheKey, TxMvccVersion.COUNTER_NA);
 
                 if (row != null) {
                     long expireTime = row.expireTime();
@@ -462,6 +463,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                     taskName,
                                     expiry,
                                     !deserializeBinary,
+                                    TxMvccVersion.COUNTER_NA,
                                     null);
 
                                 if (res != null) {
@@ -489,7 +491,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                     null,
                                     taskName,
                                     expiry,
-                                    !deserializeBinary);
+                                    !deserializeBinary,
+                                    TxMvccVersion.COUNTER_NA);
 
                                 if (v != null) {
                                     ctx.addResult(vals,
@@ -1044,7 +1047,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                             entryProcessor,
                             taskName,
                             null,
-                            keepBinary);
+                            keepBinary,
+                            TxMvccVersion.COUNTER_NA);
 
                         Object oldVal = null;
 
@@ -1164,7 +1168,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 null,
                                 taskName,
                                 null,
-                                keepBinary);
+                                keepBinary,
+                                TxMvccVersion.COUNTER_NA);
 
                             Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(
                                 ctx, entry.key(), old, keepBinary), val);
@@ -1197,7 +1202,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 null,
                                 taskName,
                                 null,
-                                keepBinary);
+                                keepBinary,
+                                TxMvccVersion.COUNTER_NA);
 
                             IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
                                 .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index f3287af..2657ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -113,6 +113,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
+     * @param cntr Counter assigned to query.
+     */
+    public void ackQueryDone(ClusterNode crd, long cntr) {
+        try {
+            cctx.gridIO().sendToGridTopic(crd,
+                TOPIC_CACHE_COORDINATOR,
+                new CoordinatorQueryAckRequest(cntr),
+                SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send query ack, node left [crd=" + crd + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + cntr + ']', e);
+        }
+    }
+
+    /**
+     * @param crd Coordinator.
      * @return Counter request future.
      */
     public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) {
@@ -253,7 +273,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryStateRequest(UUID nodeId, CoordinatorQueryCounterRequest msg) {
+    private void processCoordinatorQueryCounterRequest(UUID nodeId, CoordinatorQueryCounterRequest msg) {
         ClusterNode node = cctx.discovery().node(nodeId);
 
         if (node == null) {
@@ -265,7 +285,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
         long qryCntr = assignQueryCounter(nodeId);
 
-        CoordinatorMvccCounterResponse res = new CoordinatorMvccCounterResponse(msg.futureId(), qryCntr);
+        CoordinatorMvccCounterResponse res = new CoordinatorMvccCounterResponse(qryCntr, msg.futureId());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
@@ -322,7 +342,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param msg Message.
      */
     private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorTxAckResponse msg) {
-        TxAckFuture fut = ackFuts.get(msg.futureId());
+        TxAckFuture fut = ackFuts.remove(msg.futureId());
 
         if (fut != null)
             fut.onResponse();
@@ -340,7 +360,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      */
     private long assignQueryCounter(UUID qryNodeId) {
         // TODO IGNITE-3478
-        return committedCntr.get();
+        return committedCntr.get() + 1;
     }
 
     /**
@@ -432,7 +452,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      */
     private class TxAckFuture extends GridFutureAdapter<Void> {
         /** */
-        private final Long id;
+        private final long id;
 
         /** */
         private final ClusterNode crd;
@@ -441,7 +461,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          * @param id Future ID.
          * @param crd Coordinator.
          */
-        TxAckFuture(Long id, ClusterNode crd) {
+        TxAckFuture(long id, ClusterNode crd) {
             this.id = id;
             this.crd = crd;
         }
@@ -477,9 +497,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             for (MvccCounterFuture fut : cntrFuts.values())
                 fut.onNodeLeft(nodeId);
 
-//            for (AckFuture fut : ackFuts.values())
-//                fut.onNodeLeft(nodeId);
-//
+            for (TxAckFuture fut : ackFuts.values())
+                fut.onNodeLeft(nodeId);
         }
     }
     /**
@@ -499,7 +518,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             else if (msg instanceof CoordinatorQueryAckRequest)
                 processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
             else if (msg instanceof CoordinatorQueryCounterRequest)
-                processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
+                processCoordinatorQueryCounterRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
index e893b22..5dda247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
@@ -101,7 +101,7 @@ public class CoordinatorQueryCounterRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return -33;
+        return 133;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 8ad717a..264e2a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1444,7 +1444,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     /*closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
                     resolveTaskName(),
                     null,
-                    keepBinary);
+                    keepBinary,
+                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
             }
 
             boolean modified = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 1b31d76..b29b702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -1656,7 +1657,8 @@ public class IgniteTxHandler {
                                                 /*transformClo*/null,
                                                 tx.resolveTaskName(),
                                                 /*expiryPlc*/null,
-                                                /*keepBinary*/true);
+                                                /*keepBinary*/true,
+                                                TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
 
                                             if (val == null)
                                                 val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 836eecc..32ab96e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1097,7 +1097,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                     null,
                                     resolveTaskName(),
                                     null,
-                                    txEntry.keepBinary());
+                                    txEntry.keepBinary(),
+                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
                             }
                         }
                         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index f0e19c6..b344a61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -391,7 +391,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         Object transformClo,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean keepBinary) {
+        boolean keepBinary,
+        long mvccCntr) {
         return val;
     }
 
@@ -408,6 +409,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean keepBinary,
+        long mvccCntr,
         @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert false;
 
@@ -425,6 +427,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean keepBinary,
+        long mvccCntr,
         @Nullable ReaderArguments readerArgs) {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e71ce193/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 7920e0a..19f1dc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -24,15 +24,21 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
@@ -73,20 +79,10 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPessimisticTx1() throws Exception {
-        startGridsMultiThreaded(SRVS);
-
-        try {
-            for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
-                logCacheInfo(ccfg);
-
-                ignite(0).createCache(ccfg);
-
+        checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+            @Override public void apply(IgniteCache<Integer, Integer> cache) {
                 try {
-                    Ignite node = ignite(0);
-
-                    IgniteTransactions txs = node.transactions();
-
-                    IgniteCache<Integer, Integer> cache = node.cache(ccfg.getName());
+                    IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
                     List<Integer> keys = testKeys(cache);
 
@@ -112,10 +108,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                         assertEquals(key, val);
                     }
                 }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTx2() throws Exception {
+        checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+            @Override public void apply(IgniteCache<Integer, Integer> cache) {
+                try {
+                    IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                    List<Integer> keys = testKeys(cache);
+
+                    for (Integer key : keys) {
+                        log.info("Test key: " + key);
+
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            cache.put(key, key);
+                            cache.put(key + 1, key + 1);
+
+                            assertEquals(key, cache.get(key));
+                            assertEquals(key + 1, (Object)cache.get(key + 1));
+
+                            tx.commit();
+                        }
+
+                        assertEquals(key, cache.get(key));
+                        assertEquals(key + 1, (Object)cache.get(key + 1));
+                    }
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Closure to run.
+     * @throws Exception If failed.
+     */
+    private void checkPessimisticTx(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception {
+        startGridsMultiThreaded(SRVS);
+
+        try {
+            for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+                logCacheInfo(ccfg);
+
+                ignite(0).createCache(ccfg);
+
+                try {
+                    Ignite node = ignite(0);
+
+                    IgniteCache<Integer, Integer> cache = node.cache(ccfg.getName());
+
+                    c.apply(cache);
+                }
                 finally {
                     ignite(0).destroyCache(ccfg.getName());
                 }
             }
+
+            verifyCoordinatorInternalState();
         }
         finally {
             stopAllGrids();
@@ -142,6 +202,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             keys.addAll(primaryKeys(ignite(0).cache(ccfg.getName()), 2));
 
             Map<Integer, Integer> res = cache.getAll(keys);
+
+            verifyCoordinatorInternalState();
         }
         finally {
             stopAllGrids();
@@ -216,4 +278,25 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         return ccfg;
     }
+
+    /**
+     *
+     */
+    private void verifyCoordinatorInternalState() {
+        for (Ignite node : G.allGrids()) {
+            CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators();
+
+            Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
+
+            assertTrue(activeTxs.isEmpty());
+
+            Map cntrFuts = GridTestUtils.getFieldValue(crd, "cntrFuts");
+
+            assertTrue(cntrFuts.isEmpty());
+
+            Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts");
+
+            assertTrue(ackFuts.isEmpty());
+        }
+    }
 }


Mime
View raw message