ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [4/5] incubator-ignite git commit: IGNITE-313 Need to change affinity topology version from long to custom object
Date Wed, 25 Feb 2015 16:46:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
index 7bb513e..a96bce5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -70,7 +71,7 @@ public class GridCacheMvccCandidate<K> implements Externalizable,
     /** Topology version. */
     @SuppressWarnings( {"TransientFieldNotInitialized"})
     @GridToStringInclude
-    private transient volatile long topVer = -1;
+    private transient volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** Linked reentry. */
     private GridCacheMvccCandidate<K> reentry;
@@ -185,14 +186,14 @@ public class GridCacheMvccCandidate<K> implements Externalizable,
     /**
      * @return Topology for which this lock was acquired.
      */
-    public long topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
     /**
      * @param topVer Topology version.
      */
-    public void topologyVersion(long topVer) {
+    public void topologyVersion(AffinityTopologyVersion topVer) {
         this.topVer = topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index d125c02..c947592 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -558,9 +559,8 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
     public Collection<GridCacheMvccCandidate<K>> remoteCandidates() {
         Collection<GridCacheMvccCandidate<K>> rmtCands = new LinkedList<>();
 
-        for (GridDistributedCacheEntry<K, V> entry : locked()) {
+        for (GridDistributedCacheEntry<K, V> entry : locked())
             rmtCands.addAll(entry.remoteMvccSnapshot());
-        }
 
         return rmtCands;
     }
@@ -919,8 +919,8 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
      * @return Future that signals when all locks for given partitions are released.
      */
     @SuppressWarnings({"unchecked"})
-    public IgniteInternalFuture<?> finishLocks(long topVer) {
-        assert topVer > 0;
+    public IgniteInternalFuture<?> finishLocks(AffinityTopologyVersion topVer) {
+        assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
         return finishLocks(null, topVer);
     }
 
@@ -931,13 +931,13 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
      * @param topVer Topology version to wait for.
      * @return Explicit locks release future.
      */
-    public IgniteInternalFuture<?> finishExplicitLocks(long topVer) {
+    public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) {
         GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext());
 
         for (GridCacheExplicitLockSpan<K> span : pendingExplicit.values()) {
             GridDiscoveryTopologySnapshot snapshot = span.topologySnapshot();
 
-            if (snapshot != null && snapshot.topologyVersion() < topVer)
+            if (snapshot != null && snapshot.topologyVersion() < topVer.topologyVersion())
                 res.add(span.releaseFuture());
         }
 
@@ -951,13 +951,13 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
      *
      * @return Finish update future.
      */
-    public IgniteInternalFuture<?> finishAtomicUpdates(long topVer) {
+    public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
         GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext());
 
         res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
 
         for (GridCacheAtomicFuture<K, ?> fut : atomicFuts.values()) {
-            if (fut.waitForPartitionExchange() && fut.topologyVersion() < topVer)
+            if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0)
                 res.add((IgniteInternalFuture<Object>)fut);
         }
 
@@ -972,7 +972,7 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
      * @return Future that signals when all locks for given keys are released.
      */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> finishKeys(Collection<K> keys, long topVer) {
+    public IgniteInternalFuture<?> finishKeys(Collection<K> keys, AffinityTopologyVersion topVer) {
         if (!(keys instanceof Set))
             keys = new HashSet<>(keys);
 
@@ -990,10 +990,10 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
      * @param topVer Topology version.
      * @return Future that signals when all locks for given partitions will be released.
      */
-    private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<K> keyFilter, long topVer) {
-        assert topVer != 0;
+    private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<K> keyFilter, AffinityTopologyVersion topVer) {
+        assert topVer.topologyVersion() != 0;
 
-        if (topVer < 0)
+        if (topVer.equals(AffinityTopologyVersion.NONE))
             return new GridFinishedFuture(context().kernalContext());
 
         final FinishLockFuture finishFut = new FinishLockFuture(
@@ -1045,7 +1045,7 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
 
         /** Topology version. Instance field for toString method only. */
         @GridToStringInclude
-        private final long topVer;
+        private final AffinityTopologyVersion topVer;
 
         /** */
         @GridToStringInclude
@@ -1058,17 +1058,17 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
         public FinishLockFuture() {
             assert false;
 
-            topVer = 0;
+            topVer = AffinityTopologyVersion.ZERO;
         }
 
         /**
          * @param topVer Topology version.
          * @param entries Entries.
          */
-        FinishLockFuture(Iterable<GridDistributedCacheEntry<K, V>> entries, long topVer) {
+        FinishLockFuture(Iterable<GridDistributedCacheEntry<K, V>> entries, AffinityTopologyVersion topVer) {
             super(cctx.kernalContext(), true);
 
-            assert topVer > 0;
+            assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
 
             this.topVer = topVer;
 
@@ -1078,11 +1078,9 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
                     Collection<GridCacheMvccCandidate<K>> locs = entry.localCandidates();
 
                     if (!F.isEmpty(locs)) {
-                        Collection<GridCacheMvccCandidate<K>> cands =
-                            new ConcurrentLinkedQueue<>();
+                        Collection<GridCacheMvccCandidate<K>> cands = new ConcurrentLinkedQueue<>();
 
-                        if (locs != null)
-                            cands.addAll(F.view(locs, versionFilter()));
+                        cands.addAll(F.view(locs, versionFilter()));
 
                         if (!F.isEmpty(cands))
                             pendingLocks.put(entry.txKey(), cands);
@@ -1102,14 +1100,14 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
          * @return Filter.
          */
         private IgnitePredicate<GridCacheMvccCandidate<K>> versionFilter() {
-            assert topVer > 0;
+            assert topVer.topologyVersion() > 0;
 
             return new P1<GridCacheMvccCandidate<K>>() {
                 @Override public boolean apply(GridCacheMvccCandidate<K> c) {
                     assert c.nearLocal() || c.dhtLocal();
 
                     // Wait for explicit locks.
-                    return c.topologyVersion() == 0 || c.topologyVersion() < topVer;
+                    return c.topologyVersion().equals(AffinityTopologyVersion.ZERO) || c.topologyVersion().compareTo(topVer) < 0;
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d7b1914..246ff37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -120,7 +121,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
                     "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 
-                GridDhtPartitionExchangeId exchId = exchangeId(n.id(), e.topologyVersion(), e.type());
+                GridDhtPartitionExchangeId exchId = exchangeId(n.id(), new AffinityTopologyVersion(e.topologyVersion()),
+                    e.type());
 
                 GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e);
 
@@ -198,7 +200,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert startTime > 0;
 
-        final long startTopVer = loc.order();
+        final AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(loc.order());
 
         GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED);
 
@@ -207,7 +209,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert discoEvt != null;
 
-        assert discoEvt.topologyVersion() == startTopVer;
+        assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
 
         GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt);
 
@@ -320,7 +322,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      *
      * @return Topology version.
      */
-    public long topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return lastInitializedFuture.exchangeId().topologyVersion();
     }
 
@@ -487,7 +489,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes)
         throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(null, null, -1);
+        GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(null, null, AffinityTopologyVersion.NONE);
 
         for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())
@@ -543,7 +545,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param evt Event type.
      * @return Activity future ID.
      */
-    private GridDhtPartitionExchangeId exchangeId(UUID nodeId, long topVer, int evt) {
+    private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, int evt) {
         return new GridDhtPartitionExchangeId(nodeId, evt, topVer);
     }
 
@@ -576,7 +578,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         if (exchFuts0 != null) {
             for (GridDhtPartitionsExchangeFuture<K, V> fut : exchFuts0.values()) {
-                if (fut.exchangeId().topologyVersion() < exchFut.exchangeId().topologyVersion() - 10)
+                if (fut.exchangeId().topologyVersion().topologyVersion() < exchFut.exchangeId().topologyVersion().topologyVersion() - 10)
                     fut.cleanUp();
             }
         }
@@ -959,14 +961,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 @Override public int compare(
                     GridDhtPartitionsExchangeFuture<K, V> f1,
                     GridDhtPartitionsExchangeFuture<K, V> f2) {
-                    long t1 = f1.exchangeId().topologyVersion();
-                    long t2 = f2.exchangeId().topologyVersion();
+                    AffinityTopologyVersion t1 = f1.exchangeId().topologyVersion();
+                    AffinityTopologyVersion t2 = f2.exchangeId().topologyVersion();
 
-                    assert t1 > 0;
-                    assert t2 > 0;
+                    assert t1.topologyVersion() > 0;
+                    assert t2.topologyVersion() > 0;
 
                     // Reverse order.
-                    return t1 < t2 ? 1 : t1 == t2 ? 0 : -1;
+                    int cmp = t1.compareTo(t2);
+
+                    return cmp < 0 ? 1 : cmp == 0 ? 0 : -1;
                 }
             }, /*not strict*/false);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index e800137..c2b2b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
@@ -115,7 +116,7 @@ public interface GridCachePreloader<K, V> {
      * @param topVer Topology version, {@code -1} if not required.
      * @return Future to complete when all keys are preloaded.
      */
-    public IgniteInternalFuture<Object> request(Collection<? extends K> keys, long topVer);
+    public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer);
 
     /**
      * Force preload process.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 40e090b..fd334f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.lang.*;
@@ -111,7 +112,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(Collection<? extends K> keys, long topVer) {
+    @Override public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) {
         return new GridFinishedFuture<>(cctx.kernalContext());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 6b17038..de9ec0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -382,7 +383,7 @@ public class GridCacheSharedContext<K, V> {
      * @return {@code true} if waiting was successful.
      */
     @SuppressWarnings({"unchecked"})
-    public IgniteInternalFuture<?> partitionReleaseFuture(long topVer) {
+    public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion topVer) {
         GridCompoundFuture f = new GridCompoundFuture(kernalCtx);
 
         f.add(mvcc().finishExplicitLocks(topVer));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 0a7b768..b92cc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.managers.swapspace.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.offheap.*;
@@ -173,7 +174,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> {
      * @return Number of swap entries.
      * @throws IgniteCheckedException If failed.
      */
-    public int swapEntriesCount(boolean primary, boolean backup, long topVer) throws IgniteCheckedException {
+    public int swapEntriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         assert primary || backup;
 
         if (!swapEnabled)
@@ -196,7 +197,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> {
      * @return Number of offheap entries.
      * @throws IgniteCheckedException If failed.
      */
-    public int offheapEntriesCount(boolean primary, boolean backup, long topVer) throws IgniteCheckedException {
+    public int offheapEntriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         assert primary || backup;
 
         if (!offheapEnabled)
@@ -1515,7 +1516,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> {
      * @return Swap entries iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, long topVer)
+    public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert primary || backup;
@@ -1545,7 +1546,7 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> {
      * @return Offheap entries iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, long topVer)
+    public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert primary || backup;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 7cd13df..b0f3170 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -596,6 +597,17 @@ public class GridCacheUtils {
     }
 
     /**
+     * Gets DHT affinity nodes.
+     *
+     * @param ctx Cache context.
+     * @param topOrder Maximum allowed node order.
+     * @return Affinity nodes.
+     */
+    public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
+        return affinityNodes(ctx, topOrder.topologyVersion());
+    }
+
+    /**
      * Checks if given node has specified cache started and the local DHT storage is enabled.
      *
      * @param ctx Cache context.
@@ -1089,7 +1101,7 @@ public class GridCacheUtils {
         if (ctx.config().getCacheMode() == LOCAL)
             return F.asMap(ctx.localNode(), (Collection<K>)keys);
 
-        long topVer = ctx.discovery().topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
         if (CU.affinityNodes(ctx, topVer).isEmpty())
             return Collections.emptyMap();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 3b2d073..1e2bfe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.affinity;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -83,7 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> {
     @Override public int[] primaryPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        long topVer = cctx.discovery().topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
 
         Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
 
@@ -94,7 +95,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> {
     @Override public int[] backupPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        long topVer = cctx.discovery().topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
 
         Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
 
@@ -107,7 +108,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> {
 
         Collection<Integer> parts = new HashSet<>();
 
-        long topVer = cctx.discovery().topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
 
         for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
             for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {
@@ -170,9 +171,9 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> {
     @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
         A.notNull(keys, "keys");
 
-        long topVer = topologyVersion();
+        AffinityTopologyVersion topVer = topologyVersion();
 
-        int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size();
+        int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer.topologyVersion()).size();
 
         // Must return empty map if no alive nodes present or keys is empty.
         Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f);
@@ -215,7 +216,7 @@ public class GridCacheAffinityImpl<K, V> implements CacheAffinity<K> {
      *
      * @return Topology version.
      */
-    private long topologyVersion() {
+    private AffinityTopologyVersion topologyVersion() {
         return cctx.affinity().affinityTopologyVersion();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 25abfb0..3daadcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.task.*;
@@ -357,7 +358,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private void removeSetData(IgniteUuid setId, long topVer) throws IgniteCheckedException {
+    private void removeSetData(IgniteUuid setId, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         boolean loc = cctx.isLocal();
 
         GridCacheAffinityManager aff = cctx.affinity();
@@ -408,7 +409,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
 
         if (!cctx.isLocal()) {
             while (true) {
-                long topVer = cctx.topologyVersionFuture().get();
+                AffinityTopologyVersion topVer = cctx.topologyVersionFuture().get();
 
                 Collection<ClusterNode> nodes = CU.affinityNodes(cctx, topVer);
 
@@ -446,14 +447,14 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
                         throw e;
                 }
 
-                if (cctx.topologyVersionFuture().get() == topVer)
+                if (topVer.equals(cctx.topologyVersionFuture().get()))
                     break;
             }
         }
         else {
             blockSet(id);
 
-            cctx.dataStructures().removeSetData(id, 0);
+            cctx.dataStructures().removeSetData(id, AffinityTopologyVersion.ZERO);
         }
     }
 
@@ -641,7 +642,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
         private IgniteUuid setId;
 
         /** */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * Required by {@link Externalizable}.
@@ -655,7 +656,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
          * @param setId Set ID.
          * @param topVer Topology version.
          */
-        private RemoveSetDataCallable(String cacheName, IgniteUuid setId, long topVer) {
+        private RemoveSetDataCallable(String cacheName, IgniteUuid setId, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.setId = setId;
             this.topVer = topVer;
@@ -687,14 +688,14 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeString(out, cacheName);
             U.writeGridUuid(out, setId);
-            out.writeLong(topVer);
+            out.writeObject(topVer);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             cacheName = U.readString(in);
             setId = U.readGridUuid(in);
-            topVer = in.readLong();
+            topVer = (AffinityTopologyVersion)in.readObject();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index b2f6abc..1e8ba38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.nio.*;
 import java.util.*;
@@ -65,7 +67,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     private long ttl;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * Required empty constructor.
@@ -78,7 +80,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
      * @param topVer Topology version.
      * @param ttl TTL.
      */
-    public GridCacheTtlUpdateRequest(long topVer, long ttl) {
+    public GridCacheTtlUpdateRequest(@NotNull AffinityTopologyVersion topVer, long ttl) {
         assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
 
         this.topVer = topVer;
@@ -88,7 +90,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     /**
      * @return Topology version.
      */
-    public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -215,7 +217,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -273,7 +275,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
                 reader.incrementState();
 
             case 6:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index dc82e83..09b7143 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -137,7 +138,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     /** {@inheritDoc} */
     @Override public void removeAll() throws IgniteCheckedException {
         try {
-            long topVer;
+            AffinityTopologyVersion topVer;
 
             do {
                 topVer = ctx.affinity().affinityTopologyVersion();
@@ -150,7 +151,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                         new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get();
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion() > topVer);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -162,7 +163,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     @Override public IgniteInternalFuture<?> removeAllAsync() {
         GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(ctx.kernalContext());
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         removeAllAsync(opFut, topVer);
 
@@ -173,7 +174,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * @param opFut Future.
      * @param topVer Topology version.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) {
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
@@ -185,9 +186,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                     try {
                         fut.get();
 
-                        long topVer0 = ctx.affinity().affinityTopologyVersion();
+                        AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
 
-                        if (topVer0 == topVer)
+                        if (topVer0.equals(topVer))
                             opFut.onDone();
                         else
                             removeAllAsync(opFut, topVer0);
@@ -231,7 +232,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         private String cacheName;
 
         /** Topology version. */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /** Injected grid instance. */
         @IgniteInstanceResource
@@ -248,7 +249,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
          * @param cacheName Cache name.
          * @param topVer Topology version.
          */
-        private GlobalRemoveAllCallable(String cacheName, long topVer) {
+        private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
@@ -266,7 +267,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             ctx.gate().enter();
 
             try {
-                if (ctx.affinity().affinityTopologyVersion() != topVer)
+                if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
                     return null; // Ignore this remove request because remove request will be sent again.
 
                 GridDhtCacheAdapter<K, V> dht;
@@ -309,13 +310,13 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeString(out, cacheName);
-            out.writeLong(topVer);
+            out.writeObject(topVer);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             cacheName = U.readString(in);
-            topVer = in.readLong();
+            topVer = (AffinityTopologyVersion)in.readObject();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index dbf82dd..d72c9b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -467,7 +468,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
                     // ensure proper lock ordering for removed entries.
                     cctx.tm().addCommittedTx(this);
 
-                    long topVer = topologyVersion();
+                    AffinityTopologyVersion topVer = topologyVersion();
 
                     // Node that for near transactions we grab all entries.
                     for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 239efc3..bfda92d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.*;
@@ -62,7 +63,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
-    private long topVer = -1;
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** A future that will be completed when topology with version topVer will be ready to use. */
     private GridDhtTopologyFuture topReadyFut;
@@ -131,7 +132,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
         lock.writeLock().lock();
 
         try {
-            assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer +
+            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
                 ", exchId=" + exchId + ']';
 
             topVer = exchId.topologyVersion();
@@ -144,11 +145,11 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         lock.readLock().lock();
 
         try {
-            assert topVer > 0;
+            assert topVer.topologyVersion() > 0;
 
             return topVer;
         }
@@ -178,14 +179,14 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
         lock.writeLock().lock();
 
         try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" +
+            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
                 topVer + ", exchId=" + exchId + ']';
 
             if (!exchId.isJoined())
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
+            ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -230,12 +231,12 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
 
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException {
-        long topVer = exchId.topologyVersion();
+        AffinityTopologyVersion topVer = exchId.topologyVersion();
 
         lock.writeLock().lock();
 
         try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" +
+            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
                 topVer + ", exchId=" + exchId + ']';
 
             if (log.isDebugEnabled())
@@ -254,7 +255,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create)
+    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create)
         throws GridDhtInvalidPartitionException {
         if (!create)
             return null;
@@ -265,7 +266,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
 
     /** {@inheritDoc} */
     @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) {
-        return localPartition(1, -1, create);
+        return localPartition(1, AffinityTopologyVersion.NONE, create);
     }
 
     /** {@inheritDoc} */
@@ -279,7 +280,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) {
+    @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) {
         assert false : "Entry should not be added to client topology: " + e;
 
         return null;
@@ -304,7 +305,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> nodes(int p, long topVer) {
+    @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 
         try {
@@ -319,7 +320,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
                 for (UUID nodeId : nodeIds) {
                     ClusterNode n = cctx.discovery().node(nodeId);
 
-                    if (n != null && (topVer < 0 || n.order() <= topVer)) {
+                    if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
                         if (nodes == null)
                             nodes = new ArrayList<>();
 
@@ -342,8 +343,8 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
      * @param states Additional partition states.
      * @return List of nodes for the partition.
      */
-    private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+    private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer.topologyVersion())) : null;
 
         lock.readLock().lock();
 
@@ -362,13 +363,13 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
             List<ClusterNode> nodes = new ArrayList<>(size);
 
             for (UUID id : nodeIds) {
-                if (topVer > 0 && !allIds.contains(id))
+                if (topVer.topologyVersion() > 0 && !allIds.contains(id))
                     continue;
 
                 if (hasState(p, id, state, states)) {
                     ClusterNode n = cctx.discovery().node(id);
 
-                    if (n != null && (topVer < 0 || n.order() <= topVer))
+                    if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
                         nodes.add(n);
                 }
             }
@@ -381,18 +382,18 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p, long topVer) {
+    @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
         return nodes(p, topVer, OWNING);
     }
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p) {
-        return owners(p, -1);
+        return owners(p, AffinityTopologyVersion.NONE);
     }
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
-        return nodes(p, -1, MOVING);
+        return nodes(p, AffinityTopologyVersion.NONE, MOVING);
     }
 
     /**
@@ -400,7 +401,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
      * @param topVer Topology version.
      * @return List of nodes in state OWNING or MOVING.
      */
-    private List<ClusterNode> ownersAndMoving(int p, long topVer) {
+    private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
         return nodes(p, topVer, OWNING, MOVING);
     }
 
@@ -623,7 +624,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -673,7 +674,7 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 547d414..52fb062 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.nio.*;
 
@@ -31,7 +33,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K,
     private static final long serialVersionUID = 0L;
 
     /** Topology version being queried. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * Empty constructor.
@@ -44,7 +46,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K,
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      */
-    public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) {
+    public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) {
         this.cacheId = cacheId;
         this.topVer = topVer;
     }
@@ -57,7 +59,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K,
     /**
      * @return Requested topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -87,7 +89,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K,
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -109,7 +111,7 @@ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K,
 
         switch (reader.state()) {
             case 3:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 9b86e55..2e32632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.nio.*;
 import java.util.*;
@@ -36,7 +38,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K,
     private static final long serialVersionUID = 0L;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Affinity assignment. */
     @GridDirectTransient
@@ -58,7 +60,8 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K,
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, List<List<ClusterNode>> affAssignment) {
+    public GridDhtAffinityAssignmentResponse(int cacheId, @NotNull AffinityTopologyVersion topVer,
+        List<List<ClusterNode>> affAssignment) {
         this.cacheId = cacheId;
         this.topVer = topVer;
         this.affAssignment = affAssignment;
@@ -72,7 +75,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K,
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -133,7 +136,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K,
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -163,7 +166,7 @@ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K,
                 reader.incrementState();
 
             case 4:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 101d657..d74a62e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.future.*;
@@ -48,7 +49,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L
     private Queue<ClusterNode> availableNodes;
 
     /** Topology version. */
-    private final long topVer;
+    private final AffinityTopologyVersion topVer;
 
     /** Pending node from which response is being awaited. */
     private ClusterNode pendingNode;
@@ -57,7 +58,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L
      * @param ctx Cache context.
      * @param availableNodes Available nodes.
      */
-    public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long topVer, Collection<ClusterNode> availableNodes) {
+    public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, Collection<ClusterNode> availableNodes) {
         super(ctx.kernalContext());
 
         this.ctx = ctx;
@@ -85,7 +86,7 @@ public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<L
      * @param res Reponse.
      */
     public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) {
-        if (res.topologyVersion() != topVer) {
+        if (!res.topologyVersion().equals(topVer)) {
             if (log.isDebugEnabled())
                 log.debug("Received affinity assignment for wrong topolgy version (will ignore) " +
                     "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 2ef157c..338cd3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
@@ -96,8 +97,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
-                V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer,
+                K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
             }
         });
@@ -189,7 +190,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @return Topology version.
      * @throws IgniteCheckedException If failed.
      */
-    public long beginMultiUpdate() throws IgniteCheckedException {
+    public AffinityTopologyVersion beginMultiUpdate() throws IgniteCheckedException {
         IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get();
 
         if (tup != null)
@@ -199,7 +200,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         GridDhtTopologyFuture topFut;
 
-        long topVer;
+        AffinityTopologyVersion topVer;
 
         try {
             // While we are holding read lock, register lock future for partition release future.
@@ -260,11 +261,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param topVer Topology version.
      * @return Finish future.
      */
-    @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(long topVer) {
+    @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(AffinityTopologyVersion topVer) {
         GridCompoundFuture<IgniteUuid, Object> fut = null;
 
         for (MultiUpdateFuture multiFut : multiTxFuts.values()) {
-            if (multiFut.topologyVersion() <= topVer) {
+            if (multiFut.topologyVersion().compareTo(topVer) <= 0) {
                 if (fut == null)
                     fut = new GridCompoundFuture<>(ctx.kernalContext());
 
@@ -309,7 +310,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      *
      * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
      */
-    @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) throws GridDhtInvalidPartitionException {
+    @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
         return super.entryEx(key, topVer);
     }
 
@@ -328,7 +329,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @return DHT entry.
      * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
      */
-    public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws GridDhtInvalidPartitionException {
+    public GridDhtCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
         return (GridDhtCacheEntry<K, V>)entryEx(key, topVer);
     }
 
@@ -344,7 +345,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @throws GridDhtInvalidPartitionException if entry does not belong to this node and
      *      {@code allowDetached} is {@code false}.
      */
-    public GridCacheEntryEx<K, V> entryExx(K key, long topVer, boolean allowDetached, boolean touch) {
+    public GridCacheEntryEx<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) {
         try {
             return allowDetached && !ctx.affinity().localNode(key, topVer) ?
                 new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) :
@@ -372,7 +373,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         final boolean replicate = ctx.isDrEnabled();
 
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
@@ -396,7 +397,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         final boolean replicate = ctx.isDrEnabled();
 
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
@@ -426,14 +427,15 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         V val,
         GridCacheVersion ver,
         @Nullable IgniteBiPredicate<K, V> p,
-        long topVer,
+        AffinityTopologyVersion topVer,
         boolean replicate,
         @Nullable ExpiryPolicy plc) {
         if (p != null && !p.apply(key, val))
             return;
 
         try {
-            GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), -1, true);
+            GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key),
+                AffinityTopologyVersion.NONE, true);
 
             // Reserve to make sure that partition does not get unloaded.
             if (part.reserve()) {
@@ -483,7 +485,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     @Override public int primarySize() {
         int sum = 0;
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) {
             if (p.primary(topVer))
@@ -495,7 +497,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgniteCacheExpiryPolicy, boolean)}
+     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, boolean, IgniteCacheExpiryPolicy, boolean)}
      * method instead to retrieve DHT value.
      *
      * @param keys {@inheritDoc}
@@ -583,7 +585,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         LinkedHashMap<? extends K, Boolean> keys,
         boolean readThrough,
         boolean reload,
-        long topVer,
+        AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
         int taskNameHash,
         boolean deserializePortable,
@@ -653,7 +655,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     res.error(e);
                 }
 
-                res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion());
+                res.invalidPartitions(fut.invalidPartitions(),
+                    new AffinityTopologyVersion(ctx.discovery().topologyVersion()));
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());
@@ -682,7 +685,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>();
 
-                    long topVer = ctx.discovery().topologyVersion();
+                    AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
                     for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) {
                         List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer);
@@ -793,7 +796,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 }
                 finally {
                     if (entry != null)
-                        cache.context().evicts().touch(entry, -1L);
+                        cache.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
                 }
             }
             catch (IgniteCheckedException e) {
@@ -835,7 +838,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         /** {@inheritDoc} */
         @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() {
             final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId,
-                ctx.discovery().topologyVersion(), false);
+                new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false);
 
             Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator();
 
@@ -888,7 +891,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         /** {@inheritDoc} */
         @Override public int size() {
             GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId,
-                ctx.discovery().topologyVersion(), false);
+                new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false);
 
             return part != null ? part.publicSize() : 0;
         }
@@ -911,7 +914,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) {
         assert entry.isDht();
 
-        GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), -1, false);
+        GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE,
+            false);
 
         // Do not remove entry on replica topology. Instead, add entry to removal queue.
         // It will be cleared eventually.
@@ -936,7 +940,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         if (primary && backup)
             return iterator(map.entries0().iterator(), !ctx.keepPortable());
         else {
-            final long topVer = ctx.affinity().affinityTopologyVersion();
+            final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
             final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator();
 
@@ -1081,7 +1085,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         private static final long serialVersionUID = 0L;
 
         /** Topology version. */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * Empty constructor required by {@link Externalizable}.
@@ -1094,7 +1098,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
          * @param ctx Kernal context.
          * @param topVer Topology version.
          */
-        private MultiUpdateFuture(GridKernalContext ctx, long topVer) {
+        private MultiUpdateFuture(GridKernalContext ctx, @NotNull AffinityTopologyVersion topVer) {
             super(ctx);
 
             this.topVer = topVer;
@@ -1103,7 +1107,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         /**
          * @return Topology version.
          */
-        private long topologyVersion() {
+        private AffinityTopologyVersion topologyVersion() {
             return topVer;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index cf4f5df..3ceb180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -65,7 +66,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
      * @param ttl Time to live.
      * @param hdrId Header id.
      */
-    public GridDhtCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val,
+    public GridDhtCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
         super(ctx, key, hash, val, next, ttl, hdrId);
 
@@ -153,7 +154,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
     @Nullable public GridCacheMvccCandidate<K> addDhtLocal(
         UUID nearNodeId,
         GridCacheVersion nearVer,
-        long topVer,
+        AffinityTopologyVersion topVer,
         long threadId,
         GridCacheVersion ver,
         long timeout,
@@ -299,9 +300,9 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
      * @throws GridCacheEntryRemovedException If entry has been removed.
      */
     @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"})
-    @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(long topVer)
+    @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(AffinityTopologyVersion topVer)
         throws GridCacheEntryRemovedException {
-        if (isNew() || !valid(-1) || deletedUnlocked())
+        if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked())
             return null;
         else {
             V val0 = null;
@@ -353,7 +354,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
     @SuppressWarnings("unchecked")
-    @Nullable public IgniteInternalFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer)
+    @Nullable public IgniteInternalFuture<Boolean> addReader(UUID nodeId, long msgId, AffinityTopologyVersion topVer)
         throws GridCacheEntryRemovedException {
         // Don't add local node as reader.
         if (cctx.nodeId().equals(nodeId))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 0be5b97..90dde96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -73,7 +74,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private GridCacheVersion ver;
 
     /** Topology version .*/
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Transaction. */
     private IgniteTxLocalEx<K, V> tx;
@@ -129,7 +130,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         boolean readThrough,
         boolean reload,
         @Nullable IgniteTxLocalEx<K, V> tx,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
         int taskNameHash,
         boolean deserializePortable,
@@ -261,7 +262,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      * @return {@code True} if mapped.
      */
     private boolean map(K key, Collection<GridDhtLocalPartition> parts) {
-        GridDhtLocalPartition part = topVer > 0 ?
+        GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
             cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
             cache().topology().localPartition(key, false);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index af63307..7457de5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -517,7 +518,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti
      * @param topVer Topology version.
      * @return {@code True} if local node is primary for this partition.
      */
-    public boolean primary(long topVer) {
+    public boolean primary(AffinityTopologyVersion topVer) {
         return cctx.affinity().primary(cctx.localNode(), id, topVer);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index dba5ed2..10146f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -66,7 +67,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
     private GridCacheVersion nearLockVer;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Thread. */
     private long threadId;
@@ -154,7 +155,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
         GridCacheContext<K, V> cctx,
         UUID nearNodeId,
         GridCacheVersion nearLockVer,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         int cnt,
         boolean read,
         long timeout,
@@ -166,7 +167,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
         assert nearNodeId != null;
         assert nearLockVer != null;
-        assert topVer > 0;
+        assert topVer.topologyVersion() > 0;
 
         this.cctx = cctx;
         this.nearNodeId = nearNodeId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index b4337f8..c7f5d85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -68,7 +69,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
     private byte[] ownedBytes;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Subject ID. */
     private UUID subjId;
@@ -120,7 +121,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
         IgniteUuid futId,
         IgniteUuid miniId,
         GridCacheVersion lockVer,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         boolean isInTx,
         boolean isRead,
         TransactionIsolation isolation,
@@ -194,7 +195,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -400,7 +401,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -486,7 +487,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
                 reader.incrementState();
 
             case 30:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d9a20ae..4f28334 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.jetbrains.annotations.*;
@@ -53,7 +54,7 @@ public interface GridDhtPartitionTopology<K, V> {
      *
      * @return Topology version.
      */
-    public long topologyVersion();
+    public AffinityTopologyVersion topologyVersion();
 
     /**
      * Gets a future that will be completed when partition exchange map for this
@@ -88,7 +89,7 @@ public interface GridDhtPartitionTopology<K, V> {
      * @throws GridDhtInvalidPartitionException If partition is evicted or absent and
      *      does not belong to this node.
      */
-    @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create)
+    @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create)
         throws GridDhtInvalidPartitionException;
 
     /**
@@ -127,7 +128,7 @@ public interface GridDhtPartitionTopology<K, V> {
      * @param topVer Topology version.
      * @return Collection of all nodes responsible for this partition with primary node being first.
      */
-    public Collection<ClusterNode> nodes(int p, long topVer);
+    public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer);
 
     /**
      * @param p Partition ID.
@@ -140,7 +141,7 @@ public interface GridDhtPartitionTopology<K, V> {
      * @param topVer Topology version.
      * @return Collection of all nodes who {@code own} this partition.
      */
-    public List<ClusterNode> owners(int p, long topVer);
+    public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer);
 
     /**
      * @param p Partition ID.
@@ -159,7 +160,7 @@ public interface GridDhtPartitionTopology<K, V> {
      * @param e Entry added to cache.
      * @return Local partition.
      */
-    public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e);
+    public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e);
 
     /**
      * @param e Entry removed from cache.


Mime
View raw message