ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [3/5] incubator-ignite git commit: IGNITE-313 Need to change affinity topology version from long to custom object
Date Wed, 25 Feb 2015 16:46:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index acf00eb..4af7534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.*;
@@ -65,7 +66,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
-    private long topVer = -1;
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** A future that will be completed when topology with version topVer will be ready to use. */
     private GridDhtTopologyFuture topReadyFut;
@@ -154,7 +155,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
         lock.writeLock().lock();
 
         try {
-            assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer +
+            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
                 ", exchId=" + exchId + ']';
 
             topVer = exchId.topologyVersion();
@@ -167,11 +168,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         lock.readLock().lock();
 
         try {
-            assert topVer > 0;
+            assert topVer.topologyVersion() > 0;
 
             return topVer;
         }
@@ -205,14 +206,14 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
         lock.writeLock().lock();
 
         try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" +
+            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
                 topVer + ", exchId=" + exchId + ']';
 
             if (!exchId.isJoined())
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
+            ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -366,12 +367,12 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
 
         int num = cctx.affinity().partitions();
 
-        long topVer = exchId.topologyVersion();
+        AffinityTopologyVersion topVer = exchId.topologyVersion();
 
         lock.writeLock().lock();
 
         try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" +
+            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
                 topVer + ", exchId=" + exchId + ']';
 
             if (log.isDebugEnabled())
@@ -449,7 +450,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create)
+    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create)
         throws GridDhtInvalidPartitionException {
         return localPartition(p, topVer, create, true);
     }
@@ -461,7 +462,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
      * @param updateSeq Update sequence.
      * @return Local partition.
      */
-    private GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create, boolean updateSeq) {
+    private GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
         while (true) {
             boolean belongs = cctx.affinity().localNode(p, topVer);
 
@@ -512,7 +513,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
 
     /** {@inheritDoc} */
     @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) {
-        return localPartition(cctx.affinity().partition(key), -1, create);
+        return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
     }
 
     /** {@inheritDoc} */
@@ -526,7 +527,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) {
+    @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) {
         /*
          * Make sure not to acquire any locks here as this method
          * may be called from sensitive synchronization blocks.
@@ -572,7 +573,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> nodes(int p, long topVer) {
+    @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 
         lock.readLock().lock();
@@ -592,7 +593,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
                     if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
                         ClusterNode n = cctx.discovery().node(nodeId);
 
-                        if (n != null && (topVer < 0 || n.order() <= topVer)) {
+                        if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
                             if (nodes == null) {
                                 nodes = new ArrayList<>(affNodes.size() + 2);
 
@@ -619,8 +620,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
      * @param states Additional partition states.
      * @return List of nodes for the partition.
      */
-    private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+    private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer.topologyVersion())) : null;
 
         lock.readLock().lock();
 
@@ -639,13 +640,13 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
             List<ClusterNode> nodes = new ArrayList<>(size);
 
             for (UUID id : nodeIds) {
-                if (topVer > 0 && !allIds.contains(id))
+                if (topVer.topologyVersion() > 0 && !allIds.contains(id))
                     continue;
 
                 if (hasState(p, id, state, states)) {
                     ClusterNode n = cctx.discovery().node(id);
 
-                    if (n != null && (topVer < 0 || n.order() <= topVer))
+                    if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
                         nodes.add(n);
                 }
             }
@@ -658,7 +659,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p, long topVer) {
+    @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
         if (!cctx.preloadEnabled())
             return ownersAndMoving(p, topVer);
 
@@ -667,15 +668,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p) {
-        return owners(p, -1);
+        return owners(p, AffinityTopologyVersion.NONE);
     }
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
         if (!cctx.preloadEnabled())
-            return ownersAndMoving(p, -1);
+            return ownersAndMoving(p, AffinityTopologyVersion.NONE);
 
-        return nodes(p, -1, MOVING);
+        return nodes(p, AffinityTopologyVersion.NONE, MOVING);
     }
 
     /**
@@ -683,7 +684,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
      * @param topVer Topology version.
      * @return List of nodes in state OWNING or MOVING.
      */
-    private List<ClusterNode> ownersAndMoving(int p, long topVer) {
+    private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
         return nodes(p, topVer, OWNING, MOVING);
     }
 
@@ -978,7 +979,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
         assert nodeId.equals(cctx.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.nodeId())) {
@@ -1028,7 +1029,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldest(cctx, topVer.topologyVersion());
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index 2d2f431..704fadb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 
 /**
  * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be
@@ -29,7 +30,7 @@ import org.apache.ignite.internal.managers.discovery.*;
  * When new new transaction is started, it will wait for this future before acquiring new locks on particular
  * topology version.
  */
-public interface GridDhtTopologyFuture extends IgniteInternalFuture<Long> {
+public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopologyVersion> {
     /**
      * Gets a topology snapshot for the topology version represented by the future. Note that by the time
      * partition exchange completes some nodes from the snapshot may leave the grid. One should use discovery

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 753f7e9..75de04d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
@@ -1229,7 +1230,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @throws IgniteCheckedException If failed.
      */
     private void map(UUID nodeId,
-        long topVer,
+        AffinityTopologyVersion topVer,
         GridCacheEntryEx<K,V> cached,
         Collection<UUID> readers,
         Map<ClusterNode, List<T2<K, byte[]>>> dhtMap,
@@ -1355,7 +1356,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     if (cand == null)
                         cand = entry.candidate(dhtVer);
 
-                    long topVer = cand == null ? -1 : cand.topologyVersion();
+                    AffinityTopologyVersion topVer = cand == null
+                        ? AffinityTopologyVersion.NONE
+                        : cand.topologyVersion();
 
                     // Note that we obtain readers before lock is removed.
                     // Even in case if entry would be removed just after lock is removed,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2835844..3daace5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -52,7 +53,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
     private boolean sysInvalidate;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Pending versions with order less than one for this message (needed for commit ordering). */
     @GridToStringInclude
@@ -103,7 +104,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
         UUID nearNodeId,
         IgniteUuid futId,
         IgniteUuid miniId,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         GridCacheVersion xidVer,
         GridCacheVersion commitVer,
         long threadId,
@@ -204,7 +205,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -281,7 +282,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -369,7 +370,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
                 reader.incrementState();
 
             case 26:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6aa159c..9f2ca20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -213,7 +214,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
+    @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) {
         return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId());
     }
 
@@ -247,7 +248,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
 
     /** {@inheritDoc} */
     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached,
-        IgniteTxEntry<K, V> entry, long topVer) {
+        IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) {
         // Don't add local node as reader.
         if (!cctx.localNodeId().equals(nearNodeId)) {
             GridCacheContext<K, V> cacheCtx = cached.context();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 1c71f12..e856aa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -142,7 +143,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
     @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId,
         GridDhtCacheEntry<K, V> cached,
         IgniteTxEntry<K, V> entry,
-        long topVer);
+        AffinityTopologyVersion topVer);
 
     /**
      * @param commit Commit flag.
@@ -541,7 +542,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
         try {
             Set<K> skipped = null;
 
-            long topVer = topologyVersion();
+            AffinityTopologyVersion topVer = topologyVersion();
 
             GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e3cdc6e..8ad578b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -1247,7 +1248,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     }
                 }
 
-                long topVer = tx.topologyVersion();
+                AffinityTopologyVersion topVer = tx.topologyVersion();
 
                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 0216774..8d894ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -51,7 +52,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
     private IgniteUuid miniId;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Invalidate near entries flags. */
     private BitSet invalidateNearEntries;
@@ -112,7 +113,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
         IgniteUuid miniId,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         GridDhtTxLocalAdapter<K, V> tx,
         Collection<IgniteTxEntry<K, V>> dhtWrites,
         Collection<IgniteTxEntry<K, V>> nearWrites,
@@ -243,7 +244,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -395,7 +396,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
                 writer.incrementState();
 
             case 34:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -505,7 +506,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
                 reader.incrementState();
 
             case 34:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 506888b..b2c2300 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -85,7 +86,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
         IgniteUuid rmtFutId,
         UUID nodeId,
         long rmtThreadId,
-        long topVer,
+        AffinityTopologyVersion topVer,
         GridCacheVersion xidVer,
         GridCacheVersion commitVer,
         boolean sys,
@@ -145,7 +146,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
         UUID nodeId,
         GridCacheVersion nearXidVer,
         long rmtThreadId,
-        long topVer,
+        AffinityTopologyVersion topVer,
         GridCacheVersion xidVer,
         GridCacheVersion commitVer,
         boolean sys,
@@ -219,7 +220,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
+    @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) {
         if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId))
             return false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
index 8f78025..92d750a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -70,12 +71,12 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) {
+    @Override public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) {
         throw new AssertionError();
     }
 
     /** {@inheritDoc} */
-    @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val,
+    @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val,
         long ttl, boolean create) {
         if (create) {
             GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 980389c..fb51c91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -63,7 +64,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     private Collection<? extends K> keys;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Reload flag. */
     private boolean reload;
@@ -128,7 +129,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
         Collection<? extends K> keys,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         boolean readThrough,
         boolean reload,
         boolean forcePrimary,
@@ -165,7 +166,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * Initializes future.
      */
     public void init() {
-        long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer);
 
@@ -274,7 +275,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @param mapped Mappings to check for duplicates.
      * @param topVer Topology version on which keys should be mapped.
      */
-    private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, long topVer) {
+    private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, AffinityTopologyVersion topVer) {
         if (CU.affinityNodes(cctx, topVer).isEmpty()) {
             onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid)."));
 
@@ -347,9 +348,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             remapKeys.add(key);
                     }
 
-                    long updTopVer = ctx.discovery().topologyVersion();
+                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
-                    assert updTopVer > topVer : "Got invalid partitions for local node but topology version did " +
+                    assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
                         ", invalidParts=" + invalidParts + ']';
 
@@ -416,7 +417,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      */
     @SuppressWarnings("ConstantConditions")
     private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals,
-        long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) {
+        AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) {
         GridDhtCacheAdapter<K, V> colocated = cache();
 
         boolean remote = false;
@@ -589,7 +590,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         private LinkedHashMap<K, Boolean> keys;
 
         /** Topology version on which this future was mapped. */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * Empty constructor required for {@link Externalizable}.
@@ -603,7 +604,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
          * @param keys Keys.
          * @param topVer Topology version.
          */
-        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, long topVer) {
+        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, @NotNull AffinityTopologyVersion topVer) {
             super(cctx.kernalContext());
 
             this.node = node;
@@ -651,9 +652,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            long updTopVer = ctx.discovery().topologyVersion();
+            AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
-            assert updTopVer > topVer : "Got topology exception but topology version did " +
+            assert updTopVer.compareTo(topVer) > 0 : "Got topology exception but topology version did " +
                 "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
                 ", nodeId=" + node.id() + ']';
 
@@ -679,11 +680,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
             // Remap invalid partitions.
             if (!F.isEmpty(invalidParts)) {
-                long rmtTopVer = res.topologyVersion();
+                AffinityTopologyVersion rmtTopVer = res.topologyVersion();
 
-                assert rmtTopVer != 0;
+                assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
 
-                if (rmtTopVer <= topVer) {
+                if (rmtTopVer.compareTo(topVer) <= 0) {
                     // Fail the whole get future.
                     onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
                         "invalid partitions but remote topology version does not differ from local) " +
@@ -697,12 +698,12 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer);
+                IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
 
                 topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() {
                     @SuppressWarnings("unchecked")
                     @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException {
-                        long topVer = fut.get();
+                        AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
 
                         // This will append new futures to compound list.
                         map(F.view(keys.keySet(),  new P1<K>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9f9af31..78dce91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
@@ -119,7 +120,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
             }
@@ -906,7 +907,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKeys(keys);
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
@@ -1086,7 +1087,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 try {
                     // Do not check topology version for CLOCK versioning since
                     // partition exchange will wait for near update future.
-                    if (topology().topologyVersion() == req.topologyVersion() ||
+                    if (topology().topologyVersion().equals(req.topologyVersion()) ||
                         ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
@@ -1640,9 +1641,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         List<K> keys = req.keys();
 
-        long topVer = req.topologyVersion();
+        AffinityTopologyVersion topVer = req.topologyVersion();
 
-        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion());
 
         boolean readersOnly = false;
 
@@ -1871,9 +1872,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch.";
 
-        long topVer = req.topologyVersion();
+        AffinityTopologyVersion topVer = req.topologyVersion();
 
-        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion());
 
         CacheStorePartialUpdateException storeErr = null;
 
@@ -2091,7 +2092,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *      locks are released.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, long topVer)
+    private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, AffinityTopologyVersion topVer)
         throws GridDhtInvalidPartitionException {
         if (keys.size() == 1) {
             K key = keys.get(0);
@@ -2174,7 +2175,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param locked Locked entries.
      * @param topVer Topology version.
      */
-    private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, long topVer) {
+    private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, AffinityTopologyVersion topVer) {
         // Process deleted entries before locks release.
         assert ctx.deferredDelete();
 
@@ -2350,9 +2351,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (updateReq.fastMap())
                 return null;
 
-            long topVer = updateReq.topologyVersion();
+            AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer.topologyVersion());
 
             // We are on primary node for some key.
             assert !nodes.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
index 2fa9922..ec63130 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -35,7 +36,7 @@ public class GridDhtAtomicCacheEntry<K, V> extends GridDhtCacheEntry<K, V> {
      * @param ttl Time to live.
      * @param hdrId Header id.
      */
-    public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val,
+    public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
         super(ctx, topVer, key, hash, val, next, ttl, hdrId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 92fe74b..5f8240d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -195,7 +196,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return updateReq.topologyVersion();
     }
 
@@ -220,7 +221,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
         long ttl,
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer) {
-        long topVer = updateReq.topologyVersion();
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
 
@@ -285,7 +286,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
 
         keys.add(entry.key());
 
-        long topVer = updateReq.topologyVersion();
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         for (UUID nodeId : readers) {
             GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 91e83d2..685c466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -53,7 +54,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     private GridCacheVersion writeVer;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Keys to update. */
     @GridToStringInclude
@@ -173,7 +174,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         GridCacheVersion futVer,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         boolean forceTransformBackups,
         UUID subjId,
         int taskNameHash,
@@ -415,7 +416,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -778,7 +779,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -950,7 +951,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 reader.incrementState();
 
             case 19:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c3cc50a..8a3ca8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -113,7 +114,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     private final ExpiryPolicy expiryPlc;
 
     /** Future map topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
     /** Optional filter. */
     private final IgnitePredicate<Cache.Entry<K, V>>[] filter;
@@ -261,7 +262,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -444,8 +445,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 snapshot = fut.topologySnapshot();
             }
             else {
-                fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                    @Override public void apply(IgniteInternalFuture<Long> t) {
+                fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                         mapOnTopology(keys, remap, oldNodeId);
                     }
                 });
@@ -453,7 +454,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 return;
             }
 
-            topVer = snapshot.topologyVersion();
+            topVer = new AffinityTopologyVersion(snapshot.topologyVersion());
 
             mapTime = U.currentTimeMillis();
 
@@ -498,7 +499,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         @Nullable UUID oldNodeId) {
         assert oldNodeId == null || remap;
 
-        long topVer = topSnapshot.topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.topologyVersion());
 
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -592,7 +593,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 futVer,
                 fastMap,
                 updVer,
-                topSnapshot.topologyVersion(),
+                topVer,
                 syncMode,
                 op,
                 retval,
@@ -712,7 +713,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                             futVer,
                             fastMap,
                             updVer,
-                            topSnapshot.topologyVersion(),
+                            topVer,
                             syncMode,
                             op,
                             retval,
@@ -762,7 +763,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
      * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
      * @return Collection of nodes to which key is mapped.
      */
-    private Collection<ClusterNode> mapKey(K key, long topVer, boolean fastMap) {
+    private Collection<ClusterNode> mapKey(K key, AffinityTopologyVersion topVer, boolean fastMap) {
         GridCacheAffinityManager<K, V> affMgr = cctx.affinity();
 
         // If we can send updates in parallel - do it.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index b41e3a8..7457b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -63,7 +64,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
     private GridCacheVersion updateVer;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Write synchronization mode. */
     private CacheWriteSynchronizationMode syncMode;
@@ -166,7 +167,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         GridCacheVersion futVer,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
         boolean retval,
@@ -255,7 +256,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -642,7 +643,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -812,7 +813,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
                 reader.incrementState();
 
             case 19:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index a59b6aa..5ab75d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -82,7 +83,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
             }
@@ -118,7 +119,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary
      *      for given key.
      */
-    public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, boolean allowDetached) {
+    public GridDistributedCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached) {
         return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ?
             new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer);
     }
@@ -179,7 +180,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             });
         }
 
-        long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
 
         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
@@ -199,7 +200,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) {
+    @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) {
         try {
             return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null;
         }
@@ -224,7 +225,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean readThrough,
         boolean reload,
         boolean forcePrimary,
-        long topVer,
+        AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
@@ -424,12 +425,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null);
 
                 if (lock != null) {
-                    final long topVer = lock.topologyVersion();
+                    final AffinityTopologyVersion topVer = lock.topologyVersion();
 
-                    assert topVer > 0;
+                    assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
 
                     if (map == null) {
-                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion());
 
                         keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
 
@@ -518,10 +519,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 GridCacheMvccCandidate<K> lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
 
                 if (lock != null) {
-                    long topVer = lock.topologyVersion();
+                    AffinityTopologyVersion topVer = lock.topologyVersion();
 
                     if (map == null) {
-                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion());
 
                         keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
 
@@ -596,7 +597,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable final GridNearTxLocal<K, V> tx,
         final long threadId,
         final GridCacheVersion ver,
-        final long topVer,
+        final AffinityTopologyVersion topVer,
         final Collection<K> keys,
         final boolean txRead,
         final long timeout,
@@ -669,7 +670,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable final GridNearTxLocal<K, V> tx,
         long threadId,
         final GridCacheVersion ver,
-        final long topVer,
+        final AffinityTopologyVersion topVer,
         final Collection<K> keys,
         final boolean txRead,
         final long timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
index b19a2da..f2de9d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -35,7 +36,7 @@ public class GridDhtColocatedCacheEntry<K, V> extends GridDhtCacheEntry<K, V> {
      * @param ttl Time to live.
      * @param hdrId Header id.
      */
-    public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val,
+    public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
         super(ctx, topVer, key, hash, val, next, ttl, hdrId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 21f6364..1e13dd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -295,7 +296,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     false,
                     false);
 
-                cand.topologyVersion(topSnapshot.get().topologyVersion());
+                cand.topologyVersion(new AffinityTopologyVersion(topSnapshot.get().topologyVersion()));
             }
         }
         else {
@@ -314,7 +315,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     false,
                     false);
 
-                cand.topologyVersion(topSnapshot.get().topologyVersion());
+                cand.topologyVersion(new AffinityTopologyVersion(topSnapshot.get().topologyVersion()));
             }
             else
                 cand = cand.reenter();
@@ -562,7 +563,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     GridDiscoveryTopologySnapshot snapshot = fut.topologySnapshot();
 
                     if (tx != null) {
-                        tx.topologyVersion(snapshot.topologyVersion());
+                        tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion()));
                         tx.topologySnapshot(snapshot);
                     }
 
@@ -573,8 +574,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     markInitialized();
                 }
                 else {
-                    fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> t) {
+                    fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             mapOnTopology();
                         }
                     });
@@ -602,9 +603,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
             assert snapshot != null;
 
-            final long topVer = snapshot.topologyVersion();
+            final AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion());
 
-            assert topVer > 0;
+            assert topVer.topologyVersion() > 0;
 
             if (CU.affinityNodes(cctx, topVer).isEmpty()) {
                 onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid)."));
@@ -870,7 +871,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param topVer Topology version to lock on.
      * @param mappings Optional collection of mappings to proceed locking.
      */
-    private void lockLocally(final Collection<K> keys, long topVer,
+    private void lockLocally(final Collection<K> keys, AffinityTopologyVersion topVer,
         @Nullable final Deque<GridNearLockMapping<K, V>> mappings) {
         if (log.isDebugEnabled())
             log.debug("Before locally locking keys : " + keys);
@@ -947,7 +948,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed.
      * @throws IgniteCheckedException If key cannot be added to mapping.
      */
-    private boolean mapAsPrimary(Collection<? extends K> keys, long topVer) throws IgniteCheckedException {
+    private boolean mapAsPrimary(Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         // Assign keys to primary nodes.
         Collection<K> distributedKeys = new ArrayList<>(keys.size());
 
@@ -992,7 +993,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return {@code True} if transaction accesses key that was explicitly locked before.
      * @throws IgniteCheckedException If lock is externally held and transaction is explicit.
      */
-    private boolean addLocalKey(K key, long topVer, Collection<K> distributedKeys) throws IgniteCheckedException {
+    private boolean addLocalKey(K key, AffinityTopologyVersion topVer, Collection<K> distributedKeys) throws IgniteCheckedException {
         GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false);
 
         assert !entry.detached();
@@ -1022,7 +1023,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @throws IgniteCheckedException If mapping failed.
      */
     private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping,
-        long topVer) throws IgniteCheckedException {
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
         assert mapping == null || mapping.node() != null;
 
         ClusterNode primary = cctx.affinity().primary(key, topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 170a0c0..dbf6146 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.*;
@@ -74,7 +75,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     private AtomicInteger topCntr = new AtomicInteger(1);
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Future ID. */
     private IgniteUuid futId = IgniteUuid.randomUuid();
@@ -91,11 +92,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param keys Keys.
      * @param preloader Preloader.
      */
-    public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<? extends K> keys,
-        GridDhtPreloader<K, V> preloader) {
+    public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, @NotNull AffinityTopologyVersion topVer,
+        Collection<? extends K> keys, GridDhtPreloader<K, V> preloader) {
         super(cctx.kernalContext());
 
-        assert topVer != 0 : topVer;
+        assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
 
         this.cctx = cctx;
@@ -495,7 +496,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
             for (GridCacheEntryInfo<K, V> info : res.forcedInfos()) {
                 int p = cctx.affinity().partition(info.key());
 
-                GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false);
+                GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
 
                 if (locPart != null && locPart.state() == MOVING && locPart.reserve()) {
                     GridCacheEntryEx<K, V> entry = cctx.dht().entryEx(info.key());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index 2a28062..a17c5b74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -54,7 +56,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem
     private Collection<K> keys;
 
     /** Topology version for which keys are requested. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * @param cacheId Cache ID.
@@ -68,7 +70,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem
         IgniteUuid futId,
         IgniteUuid miniId,
         Collection<K> keys,
-        long topVer
+        @NotNull AffinityTopologyVersion topVer
     ) {
         assert futId != null;
         assert miniId != null;
@@ -133,7 +135,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem
     /**
      * @return Topology version for which keys are requested.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -195,7 +197,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -241,7 +243,7 @@ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implem
                 reader.incrementState();
 
             case 6:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 54a47d5..84b376a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -57,13 +59,13 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V>
     private int workerId = -1;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * @param updateSeq Update sequence for this node.
      * @param topVer Topology version.
      */
-    GridDhtPartitionDemandMessage(long updateSeq, long topVer, int cacheId) {
+    GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
         assert updateSeq > 0;
 
         this.cacheId = cacheId;
@@ -168,7 +170,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V>
     /**
      * @return Topology version for which demand message is sent.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -217,7 +219,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V>
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -273,7 +275,7 @@ public class GridDhtPartitionDemandMessage<K, V> extends GridCacheMessage<K, V>
                 reader.incrementState();
 
             case 5:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 6a1f7a1..8f971a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -208,8 +209,8 @@ public class GridDhtPartitionDemandPool<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Forcing preload event for future: " + exchFut);
 
-            exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                @Override public void apply(IgniteInternalFuture<Long> t) {
+            exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                     cctx.shared().exchange().forcePreloadExchange(exchFut);
                 }
             });
@@ -292,7 +293,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param topVer Topology version.
      * @return Picked owners.
      */
-    private Collection<ClusterNode> pickedOwners(int p, long topVer) {
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 
         int affCnt = affNodes.size();
@@ -318,7 +319,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param topVer Topology version.
      * @return Nodes owning this partition.
      */
-    private Collection<ClusterNode> remoteOwners(int p, long topVer) {
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
         return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
     }
 
@@ -357,8 +358,8 @@ public class GridDhtPartitionDemandPool<K, V> {
 
             obj = new GridTimeoutObjectAdapter(delay) {
                 @Override public void onTimeout() {
-                    exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> f) {
+                    exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
                             cctx.shared().exchange().forcePreloadExchange(exchFut);
                         }
                     });
@@ -481,7 +482,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @return {@code False} if partition has become invalid during preloading.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer)
+        private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, AffinityTopologyVersion topVer)
             throws IgniteCheckedException {
             try {
                 GridCacheEntryEx<K, V> cached = null;
@@ -569,7 +570,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @throws ClusterTopologyCheckedException If node left.
          * @throws IgniteCheckedException If failed to send message.
          */
-        private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage<K, V> d,
+        private Set<Integer> demandFromNode(ClusterNode node, final AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage<K, V> d,
             GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException {
             GridDhtPartitionTopology<K, V> top = cctx.dht().topology();
 
@@ -978,13 +979,13 @@ public class GridDhtPartitionDemandPool<K, V> {
         int partCnt = cctx.affinity().partitions();
 
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            exchFut.exchangeId().topologyVersion() == top.topologyVersion() :
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
                 ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
 
-        long topVer = assigns.topologyVersion();
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
 
         for (int p = 0; p < partCnt; p++) {
             if (cctx.shared().exchange().hasPendingExchange()) {


Mime
View raw message