ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [2/5] incubator-ignite git commit: IGNITE-313 Need to change affinity topology version from long to custom object
Date Wed, 25 Feb 2015 16:46:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 1823c49..71add90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -43,17 +45,17 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
     private int evt;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * @param nodeId Node ID.
      * @param evt Event.
      * @param topVer Topology version.
      */
-    public GridDhtPartitionExchangeId(UUID nodeId, int evt, long topVer) {
+    public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) {
         assert nodeId != null;
         assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED;
-        assert topVer > 0;
+        assert topVer.topologyVersion() > 0;
 
         this.nodeId = nodeId;
         this.evt = evt;
@@ -84,7 +86,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
     /**
      * @return Order.
      */
-    public long topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -105,14 +107,14 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeUuid(out, nodeId);
-        out.writeLong(topVer);
+        out.writeObject(topVer);
         out.writeInt(evt);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         nodeId = U.readUuid(in);
-        topVer = in.readLong();
+        topVer = (AffinityTopologyVersion)in.readObject();
         evt = in.readInt();
     }
 
@@ -121,7 +123,9 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
         if (o == this)
             return 0;
 
-        return topVer < o.topVer ? -1 : topVer == o.topVer ? 0 : 1;
+
+
+        return topVer.compareTo(o.topVer);
     }
 
     /** {@inheritDoc} */
@@ -129,7 +133,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
         int res = nodeId.hashCode();
 
         res = 31 * res + evt;
-        res = 31 * res + (int)(topVer ^ (topVer >>> 32));
+        res = 31 * res + topVer.hashCode();
 
         return res;
     }
@@ -141,7 +145,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
 
         GridDhtPartitionExchangeId id = (GridDhtPartitionExchangeId)o;
 
-        return evt == id.evt && topVer == id.topVer && nodeId.equals(id.nodeId);
+        return evt == id.evt && topVer.equals(id.topVer) && nodeId.equals(id.nodeId);
     }
 
     /** {@inheritDoc} */
@@ -169,7 +173,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -204,7 +208,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
                 reader.incrementState();
 
             case 2:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db9bd08..81ab4bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -24,6 +24,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -47,7 +48,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 /**
  * Future for exchanging partition maps.
  */
-public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Long>
+public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<AffinityTopologyVersion>
     implements Comparable<GridDhtPartitionsExchangeFuture<K, V>>, GridDhtTopologyFuture {
     /** */
     private static final long serialVersionUID = 0L;
@@ -216,7 +217,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
         log = cctx.logger(getClass());
 
         // Grab all nodes with order of equal or less than last joined node.
-        oldestNode.set(CU.oldest(cctx, exchId.topologyVersion()));
+        oldestNode.set(CU.oldest(cctx, exchId.topologyVersion().topologyVersion()));
 
         assert oldestNode.get() != null;
 
@@ -432,7 +433,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
                 }
 
                 // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion()));
+                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion().topologyVersion()));
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
 
@@ -444,7 +445,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
                     // If received any messages, process them.
                     onReceive(m.getKey(), m.getValue());
 
-                long topVer = exchId.topologyVersion();
+                AffinityTopologyVersion topVer = exchId.topologyVersion();
 
                 for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
@@ -485,7 +486,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
 
                     GridDhtPartitionTopology<K, V> top = cacheCtx.topology();
 
-                    assert topVer == top.topologyVersion() :
+                    assert topVer.equals(top.topologyVersion()) :
                         "Topology version is updated only in this class instances inside single ExchangeWorker thread.";
 
                     top.beforeExchange(exchId);
@@ -632,11 +633,11 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Long res, Throwable err) {
+    @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
         if (err == null) {
             for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
                 if (!cacheCtx.isLocal())
-                    cacheCtx.affinity().cleanUpCache(res - 10);
+                    cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10);
             }
         }
 
@@ -827,7 +828,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
         if (log.isDebugEnabled())
             log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
-        assert exchId.topologyVersion() == msg.topologyVersion();
+        assert exchId.topologyVersion().equals(msg.topologyVersion());
 
         initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() {
             @Override public void apply(IgniteInternalFuture<Boolean> t) {
@@ -915,7 +916,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
 
                             boolean set = false;
 
-                            ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
+                            ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion().topologyVersion());
 
                             // If local node is now oldest.
                             if (newOldest.id().equals(cctx.localNodeId())) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 70fe7fd..b7cbf5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -46,7 +47,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac
     private byte[] partsBytes;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * Required by {@link Externalizable}.
@@ -60,11 +61,11 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac
      * @param lastVer Last version.
      */
     public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
-        long topVer) {
+        @NotNull AffinityTopologyVersion topVer) {
         super(id, lastVer);
 
         assert parts != null;
-        assert id == null || topVer == id.topologyVersion();
+        assert id == null || topVer.equals(id.topologyVersion());
 
         this.topVer = topVer;
     }
@@ -96,14 +97,14 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
     /**
      * @param topVer Topology version.
      */
-    public void topologyVersion(long topVer) {
+    public void topologyVersion(AffinityTopologyVersion topVer) {
         this.topVer = topVer;
     }
 
@@ -137,7 +138,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -167,7 +168,7 @@ public class GridDhtPartitionsFullMessage<K, V> extends GridDhtPartitionsAbstrac
                 reader.incrementState();
 
             case 6:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0821431..0a74d0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.*;
@@ -70,7 +71,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
     /** Pending affinity assignment futures. */
-    private ConcurrentMap<Long, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts =
+    private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
     /** Discovery listener. */
@@ -276,7 +277,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      * @param topVer Requested topology version.
      * @param fut Future to add.
      */
-    public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
+    public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
         GridDhtAssignmentFetchFuture<K, V> old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut);
 
         assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer;
@@ -286,7 +287,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      * @param topVer Requested topology version.
      * @param fut Future to remove.
      */
-    public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
+    public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
         boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut);
 
         assert rmv : "Failed to remove assignment fetch future: " + topVer;
@@ -348,7 +349,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
             for (K k : msg.keys()) {
                 int p = cctx.affinity().partition(k);
 
-                GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false);
+                GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
 
                 // If this node is no longer an owner.
                 if (locPart == null && !top.owners(p).contains(loc))
@@ -423,13 +424,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      */
     private void processAffinityAssignmentRequest(final ClusterNode node,
         final GridDhtAffinityAssignmentRequest<K, V> req) {
-        final long topVer = req.topologyVersion();
+        final AffinityTopologyVersion topVer = req.topologyVersion();
 
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']');
 
-        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-            @Override public void apply(IgniteInternalFuture<Long> fut) {
+        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 if (log.isDebugEnabled())
                     log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                         ", node=" + node + ']');
@@ -488,7 +489,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      * @return Future for request.
      */
     @SuppressWarnings( {"unchecked", "RedundantCast"})
-    @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, long topVer) {
+    @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) {
         final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index fafec8d..76ea0a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -36,15 +37,15 @@ public class GridDhtPreloaderAssignments<K, V> extends
     private final GridDhtPartitionsExchangeFuture<K, V> exchFut;
 
     /** Last join order. */
-    private final long topVer;
+    private final AffinityTopologyVersion topVer;
 
     /**
      * @param exchFut Exchange future.
      * @param topVer Last join order.
      */
-    public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, long topVer) {
+    public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, AffinityTopologyVersion topVer) {
         assert exchFut != null;
-        assert topVer > 0;
+        assert topVer.topologyVersion() > 0;
 
         this.exchFut = exchFut;
         this.topVer = topVer;
@@ -60,7 +61,7 @@ public class GridDhtPreloaderAssignments<K, V> extends
     /**
      * @return Topology version.
      */
-    long topologyVersion() {
+    AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 22403ef..7bf4cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
@@ -210,7 +211,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
             while (true) {
                 GridCacheEntryEx<K, V> entry = null;
 
-                long topVer = ctx.affinity().affinityTopologyVersion();
+                AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
                 try {
                     entry = entryEx(key, topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 94d2e96..79f740c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -69,7 +70,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 // Can't hold any locks here - this method is invoked when
                 // holding write-lock on the whole cache map.
@@ -113,7 +114,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) {
+    @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) {
         GridNearCacheEntry<K, V> entry = null;
 
         while (true) {
@@ -136,7 +137,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param topVer Topology version.
      * @return Entry.
      */
-    public GridNearCacheEntry<K, V> entryExx(K key, long topVer) {
+    public GridNearCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) {
         return (GridNearCacheEntry<K, V>)entryEx(key, topVer);
     }
 
@@ -373,7 +374,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> primaryEntrySet(
         @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) {
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         Collection<Cache.Entry<K, V>> entries =
             F.flatCollections(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 4d86a85..fc548fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -27,10 +28,8 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.*;
 import java.util.*;
 
 import static org.apache.ignite.events.EventType.*;
@@ -85,8 +84,8 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean valid(long topVer) {
-        assert topVer > 0 : "Topology version is invalid: " + topVer;
+    @Override public boolean valid(AffinityTopologyVersion topVer) {
+        assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer;
 
         UUID primaryNodeId = this.primaryNodeId;
 
@@ -116,7 +115,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
      * @return {@code True} if this entry was initialized by this call.
      * @throws GridCacheEntryRemovedException If this entry is obsolete.
      */
-    public boolean initializeFromDht(long topVer) throws GridCacheEntryRemovedException {
+    public boolean initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
         while (true) {
             GridDhtCacheEntry<K, V> entry = cctx.near().dht().peekExx(key);
 
@@ -345,7 +344,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
     @SuppressWarnings({"RedundantTypeArguments"})
     public boolean loadedValue(@Nullable IgniteInternalTx tx, UUID primaryNodeId, V val, byte[] valBytes,
         GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable GridCacheVersion expVer, long ttl, long expireTime,
-        boolean evt, long topVer, UUID subjId)
+        boolean evt, AffinityTopologyVersion topVer, UUID subjId)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9628f3b..1a7a568 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -168,7 +169,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * Initializes future.
      */
     public void init() {
-        long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer);
 
@@ -278,7 +279,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      */
     private void map(Collection<? extends K> keys,
         Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped,
-        final long topVer) {
+        final AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
 
         if (affNodes.isEmpty()) {
@@ -347,9 +348,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             remapKeys.add(key);
                     }
 
-                    long updTopVer = ctx.discovery().topologyVersion();
+                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
-                    assert updTopVer > topVer : "Got invalid partitions for local node but topology version did " +
+                    assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
                         ", invalidParts=" + invalidParts + ']';
 
@@ -421,7 +422,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @return Map.
      */
     private Map<K, GridCacheVersion> map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings,
-        long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, Map<K, GridCacheVersion> savedVers) {
+        AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, Map<K, GridCacheVersion> savedVers) {
         final GridNearCacheAdapter<K, V> near = cache();
 
         // Allow to get cached value from the local node.
@@ -603,7 +604,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         Collection<K> keys,
         Collection<GridCacheEntryInfo<K, V>> infos,
         Map<K, GridCacheVersion> savedVers,
-        long topVer) {
+        AffinityTopologyVersion topVer) {
         boolean empty = F.isEmpty(keys);
 
         Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size());
@@ -688,7 +689,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         private Map<K, GridCacheVersion> savedVers;
 
         /** Topology version on which this future was mapped. */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * Empty constructor required for {@link Externalizable}.
@@ -703,7 +704,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
          * @param savedVers Saved entry versions.
          * @param topVer Topology version.
          */
-        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, GridCacheVersion> savedVers, long topVer) {
+        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, GridCacheVersion> savedVers,
+            @NotNull AffinityTopologyVersion topVer) {
             super(cctx.kernalContext());
 
             this.node = node;
@@ -751,9 +753,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            long updTopVer = ctx.discovery().topologyVersion();
+            AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
-            if (updTopVer > topVer) {
+            if (updTopVer.compareTo(topVer) > 0) {
                 // Remap.
                 map(keys.keySet(), F.t(node, keys), updTopVer);
 
@@ -762,7 +764,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             else {
                 final RemapTimeoutObject timeout = new RemapTimeoutObject(ctx.config().getNetworkTimeout(), topVer, e);
 
-                ctx.discovery().topologyFuture(topVer + 1).listenAsync(new CI1<IgniteInternalFuture<Long>>() {
+                ctx.discovery().topologyFuture(topVer.topologyVersion() + 1).listenAsync(new CI1<IgniteInternalFuture<Long>>() {
                     @Override public void apply(IgniteInternalFuture<Long> longIgniteFuture) {
                         if (timeout.finish()) {
                             ctx.timeout().removeTimeoutObject(timeout);
@@ -794,11 +796,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
             // Remap invalid partitions.
             if (!F.isEmpty(invalidParts)) {
-                long rmtTopVer = res.topologyVersion();
+                AffinityTopologyVersion rmtTopVer = res.topologyVersion();
 
-                assert rmtTopVer != 0;
+                assert rmtTopVer.topologyVersion() != 0;
 
-                if (rmtTopVer <= topVer) {
+                if (rmtTopVer.compareTo(topVer) <= 0) {
                     // Fail the whole get future.
                     onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
                         "invalid partitions but remote topology version does not differ from local) " +
@@ -812,7 +814,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer);
+                IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
 
                 topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() {
                     @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException {
@@ -823,7 +825,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             @Override public boolean apply(K key) {
                                 return invalidParts.contains(cctx.affinity().partition(key));
                             }
-                        }), F.t(node, keys), readyTopVer);
+                        }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
 
                         // It is critical to call onDone after adding futures to compound list.
                         onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
@@ -847,7 +849,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             private AtomicBoolean finished = new AtomicBoolean();
 
             /** Topology version to wait. */
-            private long topVer;
+            private AffinityTopologyVersion topVer;
 
             /** Exception cause. */
             private IgniteCheckedException e;
@@ -856,7 +858,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
              * @param timeout Timeout.
              * @param topVer Topology version timeout was created on.
              */
-            private RemapTimeoutObject(long timeout, long topVer, IgniteCheckedException e) {
+            private RemapTimeoutObject(long timeout, @NotNull AffinityTopologyVersion topVer, IgniteCheckedException e) {
                 super(timeout);
 
                 this.topVer = topVer;
@@ -867,7 +869,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             @Override public void onTimeout() {
                 if (finish())
                     // Fail the whole get future.
-                    onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + (topVer + 1), e));
+                    onDone(new IgniteCheckedException("Failed to wait for topology version to change: "
+                        + (topVer.topologyVersion() + 1), e));
                 // else remap happened concurrently.
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 06b3a61..30126f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -68,7 +70,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
     private LinkedHashMap<byte[], Boolean> keyBytes;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Subject ID. */
     private UUID subjId;
@@ -109,7 +111,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
         LinkedHashMap<K, Boolean> keys,
         boolean readThrough,
         boolean reload,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         UUID subjId,
         int taskNameHash,
         long accessTtl,
@@ -201,7 +203,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -308,7 +310,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -408,7 +410,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
                 reader.incrementState();
 
             case 12:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 6bbbc0f..10b7291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -62,7 +64,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements
     private Collection<Integer> invalidParts = new GridLeanSet<>();
 
     /** Topology version if invalid partitions is not empty. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Error. */
     @GridDirectTransient
@@ -144,7 +146,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements
      * @param invalidParts Partitions to retry due to ownership shift.
      * @param topVer Topology version.
      */
-    public void invalidPartitions(Collection<Integer> invalidParts, long topVer) {
+    public void invalidPartitions(Collection<Integer> invalidParts, @NotNull AffinityTopologyVersion topVer) {
         this.invalidParts = invalidParts;
         this.topVer = topVer;
     }
@@ -152,7 +154,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements
     /**
      * @return Topology version if this response has invalid partitions.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -245,7 +247,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -313,7 +315,7 @@ public class GridNearGetResponse<K, V> extends GridCacheMessage<K, V> implements
                 reader.incrementState();
 
             case 8:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 210772d..bab1691 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -296,7 +297,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable private GridCacheMvccCandidate<K> addEntry(long topVer, GridNearCacheEntry<K, V> entry, UUID dhtNodeId)
+    @Nullable private GridCacheMvccCandidate<K> addEntry(AffinityTopologyVersion topVer, GridNearCacheEntry<K, V> entry, UUID dhtNodeId)
         throws GridCacheEntryRemovedException {
         // Check if lock acquisition is timed out.
         if (timedOut)
@@ -690,7 +691,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                     GridDiscoveryTopologySnapshot snapshot = fut.topologySnapshot();
 
                     if (tx != null) {
-                        tx.topologyVersion(snapshot.topologyVersion());
+                        tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion()));
                         tx.topologySnapshot(snapshot);
                     }
 
@@ -701,8 +702,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                     markInitialized();
                 }
                 else {
-                    fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> t) {
+                    fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             mapOnTopology();
                         }
                     });
@@ -730,9 +731,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
             assert snapshot != null;
 
-            long topVer = snapshot.topologyVersion();
+            AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion());
 
-            assert topVer > 0;
+            assert topVer.topologyVersion() > 0;
 
             if (CU.affinityNodes(cctx, topVer).isEmpty()) {
                 onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all " +
@@ -1151,7 +1152,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @throws IgniteCheckedException If mapping for key failed.
      */
     private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping,
-        long topVer) throws IgniteCheckedException {
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
         assert mapping == null || mapping.node() != null;
 
         ClusterNode primary = cctx.affinity().primary(key, topVer);
@@ -1345,7 +1346,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                 int i = 0;
 
-                long topVer = topSnapshot.get().topologyVersion();
+                AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.get().topologyVersion());
 
                 for (K k : keys) {
                     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index d1ae174..e8c071b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -43,7 +44,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
     private static final long serialVersionUID = 0L;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Mini future ID. */
     private IgniteUuid miniId;
@@ -115,7 +116,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
      */
     public GridNearLockRequest(
         int cacheId,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         UUID nodeId,
         long threadId,
         IgniteUuid futId,
@@ -153,7 +154,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
             grpLockKey,
             partLock);
 
-        assert topVer > 0;
+        assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
 
         this.topVer = topVer;
         this.implicitTx = implicitTx;
@@ -169,7 +170,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -403,7 +404,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
                 writer.incrementState();
 
             case 33:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -513,7 +514,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
                 reader.incrementState();
 
             case 33:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index d6ec9dd..e772114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -183,7 +184,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         List<K> keys = req.nearKeys();
 
         if (keys != null) {
-            long topVer = ctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
             for (K key : keys) {
                 while (true) {
@@ -440,7 +441,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @param topVer Topology version.
      * @return {@code True} if entry is locally mapped as a primary or back up node.
      */
-    protected boolean isNearLocallyMapped(GridCacheEntryEx<K, V> e, long topVer) {
+    protected boolean isNearLocallyMapped(GridCacheEntryEx<K, V> e, AffinityTopologyVersion topVer) {
         return ctx.affinity().belongs(ctx.localNode(), e.key(), topVer);
     }
 
@@ -451,7 +452,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @param topVer Topology version.
      * @return {@code True} if attempt was made to evict the entry.
      */
-    protected boolean evictNearEntry(GridCacheEntryEx<K, V> e, GridCacheVersion obsoleteVer, long topVer) {
+    protected boolean evictNearEntry(GridCacheEntryEx<K, V> e, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer) {
         assert e != null;
         assert obsoleteVer != null;
 
@@ -490,7 +491,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                     try {
                         GridCacheMvccCandidate<K> cand = entry.candidate(ctx.nodeId(), Thread.currentThread().getId());
 
-                        long topVer = -1;
+                        AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
                         if (cand != null) {
                             assert cand.nearLocal() : "Got non-near-local candidate in near cache: " + cand;
@@ -498,7 +499,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                             ver = cand.version();
 
                             if (map == null) {
-                                Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+                                Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion().topologyVersion());
 
                                 if (F.isEmpty(affNodes))
                                     return;
@@ -550,9 +551,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                             }
                         }
 
-                        assert topVer != -1 || cand == null;
+                        assert !topVer.equals(AffinityTopologyVersion.NONE) || cand == null;
 
-                        if (topVer == -1)
+                        if (topVer.equals(AffinityTopologyVersion.NONE))
                             topVer = ctx.affinity().affinityTopologyVersion();
 
                         ctx.evicts().touch(entry, topVer);
@@ -616,7 +617,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
 
                             if (cand != null) {
                                 if (map == null) {
-                                    Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+                                    Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion().topologyVersion());
 
                                     if (F.isEmpty(affNodes))
                                         return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index f3811c6..85a40e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -222,7 +223,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
             if (super.onDone(tx, th != null ? th : err)) {
                 if (error() instanceof IgniteTxHeuristicCheckedException) {
-                    long topVer = this.tx.topologyVersion();
+                    AffinityTopologyVersion topVer = this.tx.topologyVersion();
 
                     for (IgniteTxEntry<K, V> e : this.tx.writeMap().values()) {
                         GridCacheContext<K, V> cacheCtx = e.context();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index f29cfea..fc843df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -45,7 +46,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
     private boolean storeEnabled;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Subject ID. */
     private UUID subjId;
@@ -86,7 +87,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
         boolean syncRollback,
         boolean explicitLock,
         boolean storeEnabled,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
@@ -148,7 +149,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -198,7 +199,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -260,7 +261,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
                 reader.incrementState();
 
             case 24:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 06e4767..1ee19de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -183,7 +184,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached,
-        IgniteTxEntry<K, V> entry, long topVer) {
+        IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) {
         // We are in near transaction, do not add local node as reader.
         return null;
     }
@@ -1140,7 +1141,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) {
+    @Override protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, AffinityTopologyVersion topVer) {
         if (cacheCtx.isColocated()) {
             IgniteTxEntry<K, V> txEntry = entry(key);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 18fda47..05a53ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
@@ -338,7 +339,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                         GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot();
 
-                        tx.topologyVersion(snapshot.topologyVersion());
+                        tx.topologyVersion(new AffinityTopologyVersion(snapshot.topologyVersion()));
                         tx.topologySnapshot(snapshot);
 
                         // Make sure to add future before calling prepare.
@@ -364,8 +365,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
                 else {
                     topFut.syncNotify(false);
 
-                    topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> t) {
+                    topFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             prepare();
                         }
                     });
@@ -463,9 +464,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
         assert snapshot != null;
 
-        long topVer = snapshot.topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(snapshot.topologyVersion());
 
-        assert topVer > 0;
+        assert topVer.topologyVersion() > 0;
 
         txMapping = new GridDhtTxMapping<>();
 
@@ -548,7 +549,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
     private void preparePessimistic() {
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping<K, V>> mappings = new HashMap<>();
 
-        long topVer = tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx.topologyVersion();
 
         txMapping = new GridDhtTxMapping<>();
 
@@ -752,7 +753,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
      */
     private GridDistributedTxMapping<K, V> map(
         IgniteTxEntry<K, V> entry,
-        long topVer,
+        AffinityTopologyVersion topVer,
         GridDistributedTxMapping<K, V> cur,
         boolean waitLock
     ) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index ca82996..4df0bb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -49,7 +50,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
     private boolean near;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** {@code True} if this last prepare request for node. */
     private boolean last;
@@ -95,7 +96,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
      */
     public GridNearTxPrepareRequest(
         IgniteUuid futId,
-        long topVer,
+        @NotNull AffinityTopologyVersion topVer,
         IgniteInternalTx<K, V> tx,
         Collection<IgniteTxEntry<K, V>> reads,
         Collection<IgniteTxEntry<K, V>> writes,
@@ -199,7 +200,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -312,7 +313,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -406,7 +407,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
                 reader.incrementState();
 
             case 32:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
index 85831a8..58d3b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.dr;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.dr.*;
@@ -66,7 +67,7 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> {
      * @param left {@code True} if exchange has been caused by node leave.
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(long topVer, boolean left) throws IgniteCheckedException;
+    public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException;
 
     /**
      * @return {@code True} is DR is enabled.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
index 49f617b..516aa17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.dr.os;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -81,7 +82,7 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(long topVer, boolean left) throws IgniteCheckedException {
+    @Override public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 53b216c..49e893b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.local;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -73,8 +74,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
-                V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer,
+                K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 return new GridLocalCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId);
             }
         });
@@ -178,7 +179,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override public void unlockAll(Collection<? extends K> keys,
         IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         for (K key : keys) {
             GridLocalCacheEntry<K, V> entry = peekExx(key);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index c3da493..cac086c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.local.atomic;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.local.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -75,7 +76,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override protected void init() {
         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
                 V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
                 return new GridLocalCacheEntry<K, V>(ctx, key, hash, val, next, ttl, hdrId);
             }
@@ -1531,7 +1532,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         for (GridCacheEntryEx<K, V> entry : locked)
             UNSAFE.monitorExit(entry);
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         for (GridCacheEntryEx<K, V> entry : locked)
             ctx.evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 362077f..ba27368 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -1263,7 +1264,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 Collection<Object> data = new ArrayList<>(pageSize);
 
-                long topVer = cctx.affinity().affinityTopologyVersion();
+                AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
                 final boolean statsEnabled = cctx.config().isStatisticsEnabled();
 
@@ -1869,7 +1870,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 return new IgniteBiPredicate<K, V>() {
                     @Override public boolean apply(K k, V v) {
-                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
+                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
                     }
                 };
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0643c0e..239b223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -152,7 +153,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
 
         boolean initialized = false;
 
-        boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
+        boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
         boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
@@ -203,8 +204,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
         if (F.isEmpty(lsnrCol))
             return;
 
-        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) {
-            boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
+        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE)) {
+            boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
             boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
             boolean initialized = false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index a06a558..40df361 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -246,7 +247,7 @@ public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject
     /**
      * @return Last recorded topology version.
      */
-    public long topologyVersion();
+    public AffinityTopologyVersion topologyVersion();
 
     /**
      * @return Flag indicating whether transaction is implicit with only one key.
@@ -266,7 +267,7 @@ public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject
      * @param topVer Topology version.
      * @return Recorded topology version.
      */
-    public long topologyVersion(long topVer);
+    public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer);
 
     /**
      * @return {@code True} if transaction is empty.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index abdb99c..0fa7806 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -184,7 +185,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
     private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
 
     /** Topology version. */
-    private AtomicLong topVer = new AtomicLong(-1);
+    private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
 
     /** Mutex. */
     private final Lock lock = new ReentrantLock();
@@ -361,7 +362,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
      *
      * @return Flag indicating whether near cache should be updated.
      */
-    protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
+    protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) {
         return false;
     }
 
@@ -488,18 +489,18 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion() {
-        long res = topVer.get();
+    @Override public AffinityTopologyVersion topologyVersion() {
+        AffinityTopologyVersion res = topVer.get();
 
-        if (res == -1)
+        if (res.equals(AffinityTopologyVersion.NONE))
             return cctx.exchange().topologyVersion();
 
         return res;
     }
 
     /** {@inheritDoc} */
-    @Override public long topologyVersion(long topVer) {
-        this.topVer.compareAndSet(-1, topVer);
+    @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer.compareAndSet(AffinityTopologyVersion.NONE, topVer);
 
         return this.topVer.get();
     }
@@ -1707,7 +1708,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public long topologyVersion() {
+        @Override public AffinityTopologyVersion topologyVersion() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
 
@@ -1717,7 +1718,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public long topologyVersion(long topVer) {
+        @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
 


Mime
View raw message