ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [42/50] [abbrv] incubator-ignite git commit: IGNITE-313 Need to change affinity topology version from long to custom object
Date Sat, 28 Feb 2015 02:22:05 GMT
IGNITE-313 Need to change affinity topology version from long to custom object


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4362085a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4362085a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4362085a

Branch: refs/heads/ignite-45
Commit: 4362085aac52105ad8106019f9b59660500675e4
Parents: 9a69903
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Wed Feb 25 19:45:29 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Wed Feb 25 19:45:29 2015 +0300

----------------------------------------------------------------------
 .../impl/ClientPartitionAffinitySelfTest.java   |   2 +-
 .../affinity/CacheAffinityFunctionContext.java  |   3 +-
 .../affinity/AffinityTopologyVersion.java       | 113 +++++++++++++++++++
 .../affinity/GridAffinityAssignment.java        |  12 +-
 .../affinity/GridAffinityAssignmentCache.java   |  70 ++++++------
 .../affinity/GridAffinityProcessor.java         |  27 ++---
 .../processors/affinity/GridAffinityUtils.java  |  10 +-
 .../GridCacheAffinityFunctionContextImpl.java   |   6 +-
 .../processors/cache/GridCacheAdapter.java      |  32 +++---
 .../cache/GridCacheAffinityManager.java         |  68 ++++++-----
 .../processors/cache/GridCacheAtomicFuture.java |   4 +-
 .../cache/GridCacheConcurrentMap.java           |  18 ++-
 .../processors/cache/GridCacheContext.java      |  11 +-
 .../processors/cache/GridCacheEntryEx.java      |  12 +-
 .../cache/GridCacheEvictionManager.java         |  37 +++---
 .../cache/GridCacheEvictionRequest.java         |  14 ++-
 .../processors/cache/GridCacheIoManager.java    |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |  19 ++--
 .../cache/GridCacheMapEntryFactory.java         |   5 +-
 .../processors/cache/GridCacheMessage.java      |   5 +-
 .../cache/GridCacheMvccCandidate.java           |   7 +-
 .../processors/cache/GridCacheMvccManager.java  |  42 ++++---
 .../GridCachePartitionExchangeManager.java      |  28 +++--
 .../processors/cache/GridCachePreloader.java    |   3 +-
 .../cache/GridCachePreloaderAdapter.java        |   3 +-
 .../cache/GridCacheSharedContext.java           |   3 +-
 .../processors/cache/GridCacheSwapManager.java  |   9 +-
 .../processors/cache/GridCacheUtils.java        |  14 ++-
 .../cache/affinity/GridCacheAffinityImpl.java   |  13 ++-
 .../CacheDataStructuresManager.java             |  17 +--
 .../distributed/GridCacheTtlUpdateRequest.java  |  12 +-
 .../GridDistributedCacheAdapter.java            |  23 ++--
 .../GridDistributedTxRemoteAdapter.java         |   3 +-
 .../dht/GridClientPartitionTopology.java        |  47 ++++----
 .../dht/GridDhtAffinityAssignmentRequest.java   |  12 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |  13 ++-
 .../dht/GridDhtAssignmentFetchFuture.java       |   7 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  56 ++++-----
 .../distributed/dht/GridDhtCacheEntry.java      |  11 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   7 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   3 +-
 .../distributed/dht/GridDhtLockFuture.java      |   7 +-
 .../distributed/dht/GridDhtLockRequest.java     |  11 +-
 .../dht/GridDhtPartitionTopology.java           |  11 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  51 +++++----
 .../distributed/dht/GridDhtTopologyFuture.java  |   3 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   7 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  11 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   5 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   5 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   3 +-
 .../dht/GridDhtTxPrepareRequest.java            |  11 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   7 +-
 .../distributed/dht/GridNoStorageCacheMap.java  |   5 +-
 .../dht/GridPartitionedGetFuture.java           |  33 +++---
 .../dht/atomic/GridDhtAtomicCache.java          |  23 ++--
 .../dht/atomic/GridDhtAtomicCacheEntry.java     |   3 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   7 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  11 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  19 ++--
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  11 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  25 ++--
 .../colocated/GridDhtColocatedCacheEntry.java   |   3 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  23 ++--
 .../dht/preloader/GridDhtForceKeysFuture.java   |  11 +-
 .../dht/preloader/GridDhtForceKeysRequest.java  |  12 +-
 .../GridDhtPartitionDemandMessage.java          |  12 +-
 .../preloader/GridDhtPartitionDemandPool.java   |  21 ++--
 .../preloader/GridDhtPartitionExchangeId.java   |  26 +++--
 .../GridDhtPartitionsExchangeFuture.java        |  19 ++--
 .../preloader/GridDhtPartitionsFullMessage.java |  15 +--
 .../dht/preloader/GridDhtPreloader.java         |  17 +--
 .../preloader/GridDhtPreloaderAssignments.java  |   9 +-
 .../distributed/near/GridNearAtomicCache.java   |   3 +-
 .../distributed/near/GridNearCacheAdapter.java  |   9 +-
 .../distributed/near/GridNearCacheEntry.java    |  11 +-
 .../distributed/near/GridNearGetFuture.java     |  41 +++----
 .../distributed/near/GridNearGetRequest.java    |  12 +-
 .../distributed/near/GridNearGetResponse.java   |  12 +-
 .../distributed/near/GridNearLockFuture.java    |  17 +--
 .../distributed/near/GridNearLockRequest.java   |  13 ++-
 .../near/GridNearTransactionalCache.java        |  17 +--
 .../near/GridNearTxFinishFuture.java            |   3 +-
 .../near/GridNearTxFinishRequest.java           |  11 +-
 .../cache/distributed/near/GridNearTxLocal.java |   5 +-
 .../near/GridNearTxPrepareFuture.java           |  15 +--
 .../near/GridNearTxPrepareRequest.java          |  11 +-
 .../processors/cache/dr/GridCacheDrManager.java |   3 +-
 .../cache/dr/os/GridOsCacheDrManager.java       |   3 +-
 .../processors/cache/local/GridLocalCache.java  |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   5 +-
 .../cache/query/GridCacheQueryManager.java      |   5 +-
 .../continuous/CacheContinuousQueryManager.java |   7 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +-
 .../cache/transactions/IgniteTxAdapter.java     |  19 ++--
 .../transactions/IgniteTxLocalAdapter.java      |   7 +-
 .../cache/transactions/IgniteTxManager.java     |  10 +-
 .../cache/version/GridCacheVersionManager.java  |   9 +-
 .../dataload/IgniteDataLoaderImpl.java          |   2 +-
 .../datastructures/GridCacheSetImpl.java        |   3 +-
 .../service/GridServiceProcessor.java           |   3 +-
 .../ignite/internal/visor/cache/VisorCache.java |   3 +-
 .../GridCachePartitionFairAffinitySelfTest.java |   9 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   7 +-
 .../cache/GridCacheAffinityApiSelfTest.java     |  24 ++--
 .../GridCacheFinishPartitionsSelfTest.java      |  16 ++-
 ...GridCacheMixedPartitionExchangeSelfTest.java |  11 +-
 .../cache/GridCacheMultiUpdateLockSelfTest.java |   5 +-
 .../processors/cache/GridCacheTestEntryEx.java  |  22 ++--
 ...actQueueFailoverDataConsistencySelfTest.java |   4 +-
 ...dCachePartitionedQueueEntryMoveSelfTest.java |   3 +-
 .../GridCacheDhtPreloadDisabledSelfTest.java    |   3 +-
 .../distributed/dht/GridCacheDhtTestUtils.java  |   5 +-
 .../near/GridCacheNearMultiNodeSelfTest.java    |   3 +-
 .../near/GridCacheNearReadersSelfTest.java      |   5 +-
 .../GridCachePartitionedTxSalvageSelfTest.java  |   3 +-
 .../ignite/testframework/GridTestUtils.java     |   3 +-
 .../junits/common/GridCommonAbstractTest.java   |   3 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   4 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   4 +-
 120 files changed, 968 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java
index 3a45615..fabb4f4 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java
@@ -345,7 +345,7 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest {
         int part = srvAff.partition(key);
 
         CacheAffinityFunctionContext ctx = new GridCacheAffinityFunctionContextImpl(new ArrayList<>(srvNodes),
-            null, null, 1, 0);
+            null, null, new AffinityTopologyVersion(1), 0);
 
         ClusterNode srvNode = F.first(srvAff.assignPartitions(ctx).get(part));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
index ea5a0ec..fd1be95 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.affinity;
 
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -59,7 +60,7 @@ public interface CacheAffinityFunctionContext {
      *
      * @return Current topology version number.
      */
-    public long currentTopologyVersion();
+    public AffinityTopologyVersion currentTopologyVersion();
 
     /**
      * Gets discovery event caused topology change.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
new file mode 100644
index 0000000..fc5f193
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersion>, Externalizable {
+    /** */
+    public static final AffinityTopologyVersion NONE = new AffinityTopologyVersion(-1);
+
+    /** */
+    public static final AffinityTopologyVersion ZERO = new AffinityTopologyVersion(0);
+
+    /** */
+    private long topVer;
+
+    /**
+     * @param ver Version.
+     */
+    public AffinityTopologyVersion(long ver) {
+        topVer = ver;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @param topVer New topology version.
+     */
+    public void topologyVersion(long topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     *
+     */
+    public AffinityTopologyVersion previous() {
+        return new AffinityTopologyVersion(topVer - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(AffinityTopologyVersion o) {
+        return Long.compare(topVer, o.topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (!(o instanceof AffinityTopologyVersion))
+            return false;
+
+        return topVer == ((AffinityTopologyVersion)o).topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        topVer = in.readLong();
+    }
+
+    /**
+     * @param msgWriter Message writer.
+     */
+    public boolean writeTo(MessageWriter msgWriter) {
+        return msgWriter.writeLong("topVer.idx", topVer);
+    }
+
+    /**
+     * @param msgReader Message reader.
+     */
+    public static AffinityTopologyVersion readFrom(MessageReader msgReader) {
+        long topVer = msgReader.readLong("topVer.idx");
+
+        return new AffinityTopologyVersion(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(topVer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 673db6d..e9df8b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -31,7 +31,7 @@ class GridAffinityAssignment implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** Topology version. */
-    private final long topVer;
+    private final AffinityTopologyVersion topVer;
 
     /** Collection of calculated affinity nodes. */
     private List<List<ClusterNode>> assignment;
@@ -47,7 +47,7 @@ class GridAffinityAssignment implements Serializable {
      *
      * @param topVer Topology version.
      */
-    GridAffinityAssignment(long topVer) {
+    GridAffinityAssignment(AffinityTopologyVersion topVer) {
         this.topVer = topVer;
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -57,7 +57,7 @@ class GridAffinityAssignment implements Serializable {
      * @param topVer Topology version.
      * @param assignment Assignment.
      */
-    GridAffinityAssignment(long topVer, List<List<ClusterNode>> assignment) {
+    GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) {
         this.topVer = topVer;
         this.assignment = assignment;
 
@@ -77,7 +77,7 @@ class GridAffinityAssignment implements Serializable {
     /**
      * @return Topology version.
      */
-    public long topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -152,7 +152,7 @@ class GridAffinityAssignment implements Serializable {
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        return (int)(topVer ^ (topVer >>> 32));
+        return topVer.hashCode();
     }
 
     /** {@inheritDoc} */
@@ -164,7 +164,7 @@ class GridAffinityAssignment implements Serializable {
         if (o == null || getClass() != o.getClass())
             return false;
 
-        return topVer == ((GridAffinityAssignment)o).topVer;
+        return topVer.equals(((GridAffinityAssignment)o).topVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 9c12a17..ee6ee2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -60,7 +60,7 @@ public class GridAffinityAssignmentCache {
     private final CacheAffinityKeyMapper affMapper;
 
     /** Affinity calculation results cache: topology version => partition => nodes. */
-    private final ConcurrentMap<Long, GridAffinityAssignment> affCache;
+    private final ConcurrentMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
 
     /** Cache item corresponding to the head topology version. */
     private final AtomicReference<GridAffinityAssignment> head;
@@ -69,7 +69,7 @@ public class GridAffinityAssignmentCache {
     private final GridCacheContext ctx;
 
     /** Ready futures. */
-    private final ConcurrentMap<Long, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
 
     /** Log. */
     private IgniteLogger log;
@@ -103,7 +103,7 @@ public class GridAffinityAssignmentCache {
 
         partsCnt = aff.partitions();
         affCache = new ConcurrentLinkedHashMap<>();
-        head = new AtomicReference<>(new GridAffinityAssignment(-1));
+        head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
     }
 
     /**
@@ -113,14 +113,14 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment for topology version.
      */
-    public void initialize(long topVer, List<List<ClusterNode>> affAssignment) {
+    public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment);
 
         affCache.put(topVer, assignment);
         head.set(assignment);
 
-        for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) {
-            if (entry.getKey() >= topVer)
+        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+            if (entry.getKey().compareTo(topVer) >= 0)
                 entry.getValue().onDone(topVer);
         }
     }
@@ -146,12 +146,12 @@ public class GridAffinityAssignmentCache {
      * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public List<List<ClusterNode>> calculate(long topVer, DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
 
-        GridAffinityAssignment prev = affCache.get(topVer - 1);
+        GridAffinityAssignment prev = affCache.get(topVer.previous());
 
         List<ClusterNode> sorted;
 
@@ -160,7 +160,7 @@ public class GridAffinityAssignmentCache {
             sorted = Collections.singletonList(ctx.localNode());
         else {
             // Resolve nodes snapshot for specified topology version.
-            Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+            Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion());
 
             sorted = sort(nodes);
         }
@@ -193,15 +193,15 @@ public class GridAffinityAssignmentCache {
         while (true) {
             GridAffinityAssignment headItem = head.get();
 
-            if (headItem.topologyVersion() >= topVer)
+            if (headItem.topologyVersion().compareTo(topVer) >= 0)
                 break;
 
             if (head.compareAndSet(headItem, updated))
                 break;
         }
 
-        for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) {
-            if (entry.getKey() <= topVer) {
+        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+            if (entry.getKey().compareTo(topVer) <= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() +
                         ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
@@ -216,7 +216,7 @@ public class GridAffinityAssignmentCache {
     /**
      * @return Last calculated affinity version.
      */
-    public long lastVersion() {
+    public AffinityTopologyVersion lastVersion() {
         return head.get().topologyVersion();
     }
 
@@ -230,8 +230,8 @@ public class GridAffinityAssignmentCache {
             log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() +
                 ", topVer=" + topVer + ']');
 
-        for (Iterator<Long> it = affCache.keySet().iterator(); it.hasNext(); )
-            if (it.next() < topVer)
+        for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); )
+            if (it.next().topologyVersion() < topVer)
                 it.remove();
     }
 
@@ -239,7 +239,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Affinity assignment.
      */
-    public List<List<ClusterNode>> assignments(long topVer) {
+    public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
         GridAffinityAssignment aff = cachedAffinity(topVer);
 
         return aff.assignment();
@@ -251,10 +251,10 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version to await for.
      * @return Future that will be completed after affinity for topology version {@code topVer} is calculated.
      */
-    @Nullable public IgniteInternalFuture<Long> readyFuture(long topVer) {
+    @Nullable public IgniteInternalFuture<AffinityTopologyVersion> readyFuture(AffinityTopologyVersion topVer) {
         GridAffinityAssignment aff = head.get();
 
-        if (aff.topologyVersion() >= topVer) {
+        if (aff.topologyVersion().compareTo(topVer) >= 0) {
             if (log.isDebugEnabled())
                 log.debug("Returning finished future for readyFuture [head=" + aff.topologyVersion() +
                     ", topVer=" + topVer + ']');
@@ -262,12 +262,12 @@ public class GridAffinityAssignmentCache {
             return null;
         }
 
-        GridFutureAdapter<Long> fut = F.addIfAbsent(readyFuts, topVer,
+        GridFutureAdapter<AffinityTopologyVersion> fut = F.addIfAbsent(readyFuts, topVer,
             new AffinityReadyFuture(ctx.kernalContext(), topVer));
 
         aff = head.get();
 
-        if (aff.topologyVersion() >= topVer) {
+        if (aff.topologyVersion().compareTo(topVer) >= 0) {
             if (log.isDebugEnabled())
                 log.debug("Completing topology ready future right away [head=" + aff.topologyVersion() +
                     ", topVer=" + topVer + ']');
@@ -315,7 +315,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Affinity nodes.
      */
-    public List<ClusterNode> nodes(int part, long topVer) {
+    public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) {
         // Resolve cached affinity nodes.
         return cachedAffinity(topVer).get(part);
     }
@@ -327,7 +327,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Primary partitions for specified node ID.
      */
-    public Set<Integer> primaryPartitions(UUID nodeId, long topVer) {
+    public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         return cachedAffinity(topVer).primaryPartitions(nodeId);
     }
 
@@ -338,7 +338,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Backup partitions for specified node ID.
      */
-    public Set<Integer> backupPartitions(UUID nodeId, long topVer) {
+    public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         return cachedAffinity(topVer).backupPartitions(nodeId);
     }
 
@@ -348,17 +348,17 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Cached affinity.
      */
-    private GridAffinityAssignment cachedAffinity(long topVer) {
-        if (topVer == -1)
+    private GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
+        if (topVer.equals(AffinityTopologyVersion.NONE))
             topVer = lastVersion();
         else
             awaitTopologyVersion(topVer);
 
-        assert topVer >= 0 : topVer;
+        assert topVer.topologyVersion() >= 0 : topVer;
 
         GridAffinityAssignment cache = head.get();
 
-        if (cache.topologyVersion() != topVer) {
+        if (!cache.topologyVersion().equals(topVer)) {
             cache = affCache.get(topVer);
 
             if (cache == null) {
@@ -368,7 +368,7 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        assert cache.topologyVersion() == topVer : "Invalid cached affinity: " + cache;
+        assert cache.topologyVersion().equals(topVer) : "Invalid cached affinity: " + cache;
 
         return cache;
     }
@@ -376,10 +376,10 @@ public class GridAffinityAssignmentCache {
     /**
      * @param topVer Topology version to wait.
      */
-    private void awaitTopologyVersion(long topVer) {
+    private void awaitTopologyVersion(AffinityTopologyVersion topVer) {
         GridAffinityAssignment aff = head.get();
 
-        if (aff.topologyVersion() >= topVer)
+        if (aff.topologyVersion().compareTo(topVer) >= 0)
             return;
 
         try {
@@ -387,7 +387,7 @@ public class GridAffinityAssignmentCache {
                 log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() +
                 ", topVer=" + topVer + ']');
 
-            IgniteInternalFuture<Long> fut = readyFuture(topVer);
+            IgniteInternalFuture<AffinityTopologyVersion> fut = readyFuture(topVer);
 
             if (fut != null)
                 fut.get();
@@ -417,12 +417,12 @@ public class GridAffinityAssignmentCache {
     /**
      * Affinity ready future. Will remove itself from ready futures map.
      */
-    private class AffinityReadyFuture extends GridFutureAdapter<Long> {
+    private class AffinityReadyFuture extends GridFutureAdapter<AffinityTopologyVersion> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        private long reqTopVer;
+        private AffinityTopologyVersion reqTopVer;
 
         /**
          * Empty constructor required by {@link Externalizable}.
@@ -434,14 +434,14 @@ public class GridAffinityAssignmentCache {
         /**
          * @param ctx Kernal context.
          */
-        private AffinityReadyFuture(GridKernalContext ctx, long reqTopVer) {
+        private AffinityReadyFuture(GridKernalContext ctx, AffinityTopologyVersion reqTopVer) {
             super(ctx);
 
             this.reqTopVer = reqTopVer;
         }
 
         /** {@inheritDoc} */
-        @Override public boolean onDone(Long res, @Nullable Throwable err) {
+        @Override public boolean onDone(AffinityTopologyVersion res, @Nullable Throwable err) {
             assert res != null || err != null;
 
             boolean done = super.onDone(res, err);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index d7d0391..58aad82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -85,7 +85,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
                 final Collection<AffinityAssignmentKey> rmv = new HashSet<>();
 
                 for (AffinityAssignmentKey key : affMap.keySet()) {
-                    if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 10)
+                    if (!caches.contains(key.cacheName) || key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10)
                         rmv.add(key);
                 }
 
@@ -167,7 +167,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @return Picked node.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, long topVer) throws IgniteCheckedException {
+    @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key), topVer);
 
         return map != null ? F.first(map.keySet()) : null;
@@ -189,7 +189,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL)
             return Collections.singletonList(loc);
 
-        long topVer = ctx.discovery().topologyVersion();
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 
         AffinityInfo affInfo = affinityCache(cacheName, topVer);
 
@@ -220,7 +220,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         if (key == null)
             return null;
 
-        AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersion());
+        AffinityInfo affInfo = affinityCache(cacheName, new AffinityTopologyVersion(ctx.discovery().topologyVersion()));
 
         if (affInfo == null || affInfo.mapper == null)
             return null;
@@ -255,7 +255,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      */
     private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
         Collection<? extends K> keys) throws IgniteCheckedException {
-        return keysToNodes(cacheName, keys, ctx.discovery().topologyVersion());
+        return keysToNodes(cacheName, keys, new AffinityTopologyVersion(ctx.discovery().topologyVersion()));
     }
 
     /**
@@ -266,7 +266,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
-        Collection<? extends K> keys, long topVer) throws IgniteCheckedException {
+        Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         if (F.isEmpty(keys))
             return Collections.emptyMap();
 
@@ -286,7 +286,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings("ErrorNotRethrown")
-    private AffinityInfo affinityCache(@Nullable final String cacheName, long topVer) throws IgniteCheckedException {
+    private AffinityInfo affinityCache(@Nullable final String cacheName, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
         AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
 
         IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
@@ -406,7 +407,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @return Affinity cached function.
      * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects.
      */
-    private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, long topVer, ClusterNode n)
+    private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, AffinityTopologyVersion topVer, ClusterNode n)
         throws IgniteCheckedException {
         GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure()
             .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get();
@@ -561,13 +562,13 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         private String cacheName;
 
         /** */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * @param cacheName Cache name.
          * @param topVer Topology version.
          */
-        private AffinityAssignmentKey(String cacheName, long topVer) {
+        private AffinityAssignmentKey(String cacheName, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
@@ -582,14 +583,14 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
 
             AffinityAssignmentKey that = (AffinityAssignmentKey)o;
 
-            return topVer == that.topVer && F.eq(cacheName, that.cacheName);
+            return topVer.equals(that.topVer) && F.eq(cacheName, that.cacheName);
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
             int res = cacheName != null ? cacheName.hashCode() : 0;
 
-            res = 31 * res + (int)(topVer ^ (topVer >>> 32));
+            res = 31 * res + topVer.hashCode();
 
             return res;
         }
@@ -853,7 +854,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
          * @return Affinity info for current topology version.
          */
         private AffinityInfo cache() throws IgniteCheckedException {
-            return affinityCache(cacheName, topologyVersion());
+            return affinityCache(cacheName, new AffinityTopologyVersion(topologyVersion()));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index a5e8349..33bc851 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -48,7 +48,7 @@ class GridAffinityUtils {
      * @return Affinity job.
      */
     static Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> affinityJob(
-        String cacheName, long topVer) {
+        String cacheName, AffinityTopologyVersion topVer) {
         return new AffinityJob(cacheName, topVer);
     }
 
@@ -135,12 +135,12 @@ class GridAffinityUtils {
         private String cacheName;
 
         /** */
-        private long topVer;
+        private AffinityTopologyVersion topVer;
 
         /**
          * @param cacheName Cache name.
          */
-        private AffinityJob(@Nullable String cacheName, long topVer) {
+        private AffinityJob(@Nullable String cacheName, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
@@ -175,13 +175,13 @@ class GridAffinityUtils {
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeString(out, cacheName);
-            out.writeLong(topVer);
+            out.writeObject(topVer);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             cacheName = U.readString(in);
-            topVer = in.readLong();
+            topVer = (AffinityTopologyVersion)in.readObject();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
index c56355b..1d4a5cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
@@ -38,7 +38,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti
     private DiscoveryEvent discoEvt;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Number of backups to assign. */
     private int backups;
@@ -48,7 +48,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti
      * @param topVer Topology version.
      */
     public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment,
-        DiscoveryEvent discoEvt, long topVer, int backups) {
+        DiscoveryEvent discoEvt, @NotNull AffinityTopologyVersion topVer, int backups) {
         this.topSnapshot = topSnapshot;
         this.prevAssignment = prevAssignment;
         this.discoEvt = discoEvt;
@@ -67,7 +67,7 @@ public class GridCacheAffinityFunctionContextImpl implements CacheAffinityFuncti
     }
 
     /** {@inheritDoc} */
-    @Override public long currentTopologyVersion() {
+    @Override public AffinityTopologyVersion currentTopologyVersion() {
         return topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..21f6137 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.compute.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -750,7 +751,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
         // Swap and offheap are disabled for near cache.
         if (modes.primary || modes.backup) {
-            long topVer = ctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
             GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
@@ -797,7 +798,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             V val = null;
 
             if (!ctx.isLocal()) {
-                long topVer = ctx.affinity().affinityTopologyVersion();
+                AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
                 int part = ctx.affinity().partition(key);
 
@@ -905,7 +906,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             GridCacheEntryEx<K, V> e = peekEx(key);
 
             if (e != null)
-                return e.peek(heap, offheap, swap, -1, plc);
+                return e.peek(heap, offheap, swap, AffinityTopologyVersion.NONE, plc);
         }
 
         if (offheap || swap) {
@@ -1277,7 +1278,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @param key Entry key.
      * @return Entry (never {@code null}).
      */
-    public GridCacheEntryEx<K, V> entryEx(K key, long topVer) {
+    public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) {
         GridCacheEntryEx<K, V> e = entry0(key, topVer, true, false);
 
         assert e != null;
@@ -1292,7 +1293,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @param touch Flag to touch created entry (only if entry was actually created).
      * @return Entry or <tt>null</tt>.
      */
-    @Nullable private GridCacheEntryEx<K, V> entry0(K key, long topVer, boolean create, boolean touch) {
+    @Nullable private GridCacheEntryEx<K, V> entry0(K key, AffinityTopologyVersion topVer, boolean create, boolean touch) {
         GridTriple<GridCacheMapEntry<K, V>> t = map.putEntryIfObsoleteOrAbsent(topVer, key, null,
             ctx.config().getDefaultTimeToLive(), create);
 
@@ -1767,7 +1768,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         @Nullable UUID subjId, String taskName) {
         ctx.denyOnFlag(READ);
 
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         if (!F.isEmpty(keys)) {
             final String uid = CU.uuid(); // Get meta UUID for this thread.
@@ -1915,7 +1916,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @param key Key.
      * @return Entry.
      */
-    @Nullable protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) {
+    @Nullable protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) {
         return entryEx(key);
     }
 
@@ -2129,7 +2130,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             try {
                 assert keys != null;
 
-                final long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+                final AffinityTopologyVersion topVer = tx == null
+                    ? ctx.affinity().affinityTopologyVersion()
+                    : tx.topologyVersion();
 
                 final Map<K, V> map = new GridLeanMap<>(keys.size());
 
@@ -3772,7 +3775,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             validateCacheKey(key);
 
         try {
-            GridCacheEntryEx<K, V> e = entry0(key, ctx.discovery().topologyVersion(), false, false);
+            GridCacheEntryEx<K, V> e = entry0(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()),
+                false, false);
 
             if (e == null)
                 return false;
@@ -3868,7 +3872,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args)
         throws IgniteCheckedException {
         final boolean replicate = ctx.isDrEnabled();
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
@@ -3925,7 +3929,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         V val,
         GridCacheVersion ver,
         @Nullable IgniteBiPredicate<K, V> p,
-        long topVer,
+        AffinityTopologyVersion topVer,
         boolean replicate,
         long ttl) {
         if (p != null && !p.apply(key, val))
@@ -4081,7 +4085,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         throws IgniteCheckedException
     {
         final boolean replicate = ctx.isDrEnabled();
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
@@ -4231,7 +4235,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
         // Swap and offheap are disabled for near cache.
         if (modes.primary || modes.backup) {
-            long topVer = ctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
             GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
@@ -5159,7 +5163,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         if (keyCheck)
             validateCacheKey(key);
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
         if (ctx.portableEnabled())
             key = (K)ctx.marshalToPortable(key);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index d3510e4..6cdfc6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -36,6 +36,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
     /** Factor for maximum number of attempts to calculate all partition affinity keys. */
     private static final int MAX_PARTITION_KEY_ATTEMPT_RATIO = 10;
 
+    /** */
+    private static final AffinityTopologyVersion TOP_FIRST = new AffinityTopologyVersion(1);
+
     /** Affinity cached function. */
     private GridAffinityAssignmentCache aff;
 
@@ -83,7 +86,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(1, null);
+            aff.calculate(TOP_FIRST, null);
     }
 
     /** {@inheritDoc} */
@@ -103,10 +106,21 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version to wait.
      * @return Affinity ready future.
      */
-    public IgniteInternalFuture<Long> affinityReadyFuture(long topVer) {
+    public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(long topVer) {
+        return affinityReadyFuture(new AffinityTopologyVersion(topVer));
+    }
+
+    /**
+     * Gets affinity ready future, a future that will be completed after affinity with given
+     * topology version is calculated.
+     *
+     * @param topVer Topology version to wait.
+     * @return Affinity ready future.
+     */
+    public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion topVer) {
         assert !cctx.isLocal();
 
-        IgniteInternalFuture<Long> fut = aff.readyFuture(topVer);
+        IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(topVer);
 
         return fut != null ? fut : new GridFinishedFutureEx<>(topVer);
     }
@@ -118,7 +132,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version to wait.
      * @return Affinity ready future or {@code null}.
      */
-    @Nullable public IgniteInternalFuture<Long> affinityReadyFuturex(long topVer) {
+    @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuturex(AffinityTopologyVersion topVer) {
         assert !cctx.isLocal();
 
         return aff.readyFuture(topVer);
@@ -141,7 +155,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment for this topology version.
      */
-    public void initializeAffinity(long topVer, List<List<ClusterNode>> affAssignment) {
+    public void initializeAffinity(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
         assert !cctx.isLocal();
 
         aff.initialize(topVer, affAssignment);
@@ -151,9 +165,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Affinity assignments.
      */
-    public List<List<ClusterNode>> assignments(long topVer) {
+    public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = 1;
+            topVer = new AffinityTopologyVersion(1);
 
         return aff.assignments(topVer);
     }
@@ -164,7 +178,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version to calculate affinity for.
      * @param discoEvt Discovery event that causes this topology change.
      */
-    public List<List<ClusterNode>> calculateAffinity(long topVer, DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
         assert !cctx.isLocal();
 
         return aff.calculate(topVer, discoEvt);
@@ -207,7 +221,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Affinity nodes.
      */
-    public List<ClusterNode> nodes(K key, long topVer) {
+    public List<ClusterNode> nodes(K key, AffinityTopologyVersion topVer) {
         return nodes(partition(key), topVer);
     }
 
@@ -216,9 +230,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Affinity nodes.
      */
-    public List<ClusterNode> nodes(int part, long topVer) {
+    public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = 1;
+            topVer = new AffinityTopologyVersion(1);
 
         return aff.nodes(part, topVer);
     }
@@ -228,7 +242,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Primary node for given key.
      */
-    @Nullable public ClusterNode primary(K key, long topVer) {
+    @Nullable public ClusterNode primary(K key, AffinityTopologyVersion topVer) {
         return primary(partition(key), topVer);
     }
 
@@ -237,7 +251,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Primary node for given key.
      */
-    @Nullable public ClusterNode primary(int part, long topVer) {
+    @Nullable public ClusterNode primary(int part, AffinityTopologyVersion topVer) {
         List<ClusterNode> nodes = nodes(part, topVer);
 
         if (nodes.isEmpty())
@@ -252,7 +266,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code True} if checked node is primary for given key.
      */
-    public boolean primary(ClusterNode n, K key, long topVer) {
+    public boolean primary(ClusterNode n, K key, AffinityTopologyVersion topVer) {
         return F.eq(primary(key, topVer), n);
     }
 
@@ -262,7 +276,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code True} if checked node is primary for given key.
      */
-    public boolean primary(ClusterNode n, int part, long topVer) {
+    public boolean primary(ClusterNode n, int part, AffinityTopologyVersion topVer) {
         return F.eq(primary(part, topVer), n);
     }
 
@@ -271,7 +285,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Backup nodes.
      */
-    public Collection<ClusterNode> backups(K key, long topVer) {
+    public Collection<ClusterNode> backups(K key, AffinityTopologyVersion topVer) {
         return backups(partition(key), topVer);
     }
 
@@ -280,7 +294,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Backup nodes.
      */
-    public Collection<ClusterNode> backups(int part, long topVer) {
+    public Collection<ClusterNode> backups(int part, AffinityTopologyVersion topVer) {
         List<ClusterNode> nodes = nodes(part, topVer);
 
         assert !F.isEmpty(nodes);
@@ -296,7 +310,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return Nodes for the keys.
      */
-    public Collection<ClusterNode> remoteNodes(Iterable<? extends K> keys, long topVer) {
+    public Collection<ClusterNode> remoteNodes(Iterable<? extends K> keys, AffinityTopologyVersion topVer) {
         Collection<Collection<ClusterNode>> colcol = new GridLeanSet<>();
 
         for (K key : keys)
@@ -310,7 +324,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code true} if given key belongs to local node.
      */
-    public boolean localNode(K key, long topVer) {
+    public boolean localNode(K key, AffinityTopologyVersion topVer) {
         return localNode(partition(key), topVer);
     }
 
@@ -319,7 +333,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code true} if given partition belongs to local node.
      */
-    public boolean localNode(int part, long topVer) {
+    public boolean localNode(int part, AffinityTopologyVersion topVer) {
         assert part >= 0 : "Invalid partition: " + part;
 
         return nodes(part, topVer).contains(cctx.localNode());
@@ -331,7 +345,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code true} if given partition belongs to specified node.
      */
-    public boolean belongs(ClusterNode node, int part, long topVer) {
+    public boolean belongs(ClusterNode node, int part, AffinityTopologyVersion topVer) {
         assert node != null;
         assert part >= 0 : "Invalid partition: " + part;
 
@@ -344,7 +358,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version.
      * @return {@code true} if given key belongs to specified node.
      */
-    public boolean belongs(ClusterNode node, K key, long topVer) {
+    public boolean belongs(ClusterNode node, K key, AffinityTopologyVersion topVer) {
         assert node != null;
 
         return belongs(node, partition(key), topVer);
@@ -355,9 +369,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version to calculate affinity.
      * @return Partitions for which given node is primary.
      */
-    public Set<Integer> primaryPartitions(UUID nodeId, long topVer) {
+    public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = 1;
+            topVer = new AffinityTopologyVersion(1);
 
         return aff.primaryPartitions(nodeId, topVer);
     }
@@ -367,9 +381,9 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param topVer Topology version to calculate affinity.
      * @return Partitions for which given node is backup.
      */
-    public Set<Integer> backupPartitions(UUID nodeId, long topVer) {
+    public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = 1;
+            topVer = new AffinityTopologyVersion(1);
 
         return aff.backupPartitions(nodeId, topVer);
     }
@@ -377,7 +391,7 @@ public class GridCacheAffinityManager<K, V> extends GridCacheManagerAdapter<K, V
     /**
      * @return Affinity-ready topology version.
      */
-    public long affinityTopologyVersion() {
+    public AffinityTopologyVersion affinityTopologyVersion() {
         return aff.lastVersion();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index ec0999a..0c42eba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.processors.affinity.*;
+
 import java.util.*;
 
 /**
@@ -31,7 +33,7 @@ public interface GridCacheAtomicFuture<K, R> extends GridCacheFuture<R> {
     /**
      * @return Future topology version.
      */
-    public long topologyVersion();
+    public AffinityTopologyVersion topologyVersion();
 
     /**
      * @return Future keys.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index a169706..c1634d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -463,7 +464,7 @@ public class GridCacheConcurrentMap<K, V> {
      * @param ttl Time to live.
      * @return Cache entry for corresponding key-value pair.
      */
-    public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) {
+    public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) {
         assert key != null;
 
         checkWeakQueue();
@@ -482,7 +483,7 @@ public class GridCacheConcurrentMap<K, V> {
      * @return Triple where the first element is current entry associated with the key,
      *      the second is created entry and the third is doomed (all may be null).
      */
-    public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val,
+    public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val,
         long ttl, boolean create) {
         assert key != null;
 
@@ -504,7 +505,7 @@ public class GridCacheConcurrentMap<K, V> {
      */
     public void putAll(Map<? extends K, ? extends V> m, long ttl) {
         for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
-            putEntry(-1, e.getKey(), e.getValue(), ttl);
+            putEntry(AffinityTopologyVersion.NONE, e.getKey(), e.getValue(), ttl);
     }
 
     /**
@@ -894,7 +895,7 @@ public class GridCacheConcurrentMap<K, V> {
          * @return Associated value.
          */
         @SuppressWarnings({"unchecked"})
-        GridCacheMapEntry<K, V> put(K key, int hash, @Nullable V val, long topVer, long ttl) {
+        GridCacheMapEntry<K, V> put(K key, int hash, @Nullable V val, AffinityTopologyVersion topVer, long ttl) {
             lock();
 
             try {
@@ -914,7 +915,7 @@ public class GridCacheConcurrentMap<K, V> {
          * @return Associated value.
          */
         @SuppressWarnings({"unchecked", "SynchronizationOnLocalVariableOrMethodParameter"})
-        private GridCacheMapEntry<K, V> put0(K key, int hash, V val, long topVer, long ttl) {
+        private GridCacheMapEntry<K, V> put0(K key, int hash, V val, AffinityTopologyVersion topVer, long ttl) {
             try {
                 SegmentHeader<K, V> hdr = this.hdr;
 
@@ -989,8 +990,8 @@ public class GridCacheConcurrentMap<K, V> {
          *      the second is created entry and the third is doomed (all may be null).
          */
         @SuppressWarnings( {"unchecked"})
-        GridTriple<GridCacheMapEntry<K, V>> putIfObsolete(K key, int hash, @Nullable V val, long topVer, long ttl,
-            boolean create) {
+        GridTriple<GridCacheMapEntry<K, V>> putIfObsolete(K key, int hash, @Nullable V val,
+            AffinityTopologyVersion topVer, long ttl, boolean create) {
             lock();
 
             try {
@@ -1872,9 +1873,6 @@ public class GridCacheConcurrentMap<K, V> {
         boolean containsValue(V v) {
             A.notNull(v, "value");
 
-            if (v == null)
-                return false;
-
             for (Iterator<V> it = valueIterator(); it.hasNext(); ) {
                 V v0 = it.next();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3ec013c..c4367b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.managers.swapspace.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
@@ -479,7 +480,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         cache.map().incrementSize(e);
 
         if (isDht() || isColocated() || isDhtAtomic()) {
-            GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false);
+            GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE,
+                false);
 
             if (part != null)
                 part.incrementPublicSize();
@@ -497,7 +499,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         cache.map().decrementSize(e);
 
         if (isDht() || isColocated() || isDhtAtomic()) {
-            GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), -1, false);
+            GridDhtLocalPartition<K, V> part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE,
+                false);
 
             if (part != null)
                 part.decrementPublicSize();
@@ -1506,10 +1509,10 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return {@code True} if mapped.
      * @throws GridCacheEntryRemovedException If reader for entry is removed.
      */
-    public boolean dhtMap(UUID nearNodeId, long topVer, GridDhtCacheEntry<K, V> entry, IgniteLogger log,
+    public boolean dhtMap(UUID nearNodeId, AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> entry, IgniteLogger log,
         Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap,
         @Nullable Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) throws GridCacheEntryRemovedException {
-        assert topVer != -1;
+        assert !topVer.equals(AffinityTopologyVersion.NONE);
 
         Collection<ClusterNode> dhtNodes = dht().topology().nodes(entry.partition(), topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 1ffed64..4bc1e63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.eviction.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -274,7 +275,7 @@ public interface GridCacheEntryEx<K, V> {
      *
      * @return Checks if value is valid.
      */
-    public boolean valid(long topVer);
+    public boolean valid(AffinityTopologyVersion topVer);
 
     /**
      * @return {@code True} if partition is in valid.
@@ -361,7 +362,7 @@ public interface GridCacheEntryEx<K, V> {
         long ttl,
         boolean evt,
         boolean metrics,
-        long topVer,
+        AffinityTopologyVersion topVer,
         IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
         long drExpireTime,
@@ -397,7 +398,7 @@ public interface GridCacheEntryEx<K, V> {
         boolean retval,
         boolean evt,
         boolean metrics,
-        long topVer,
+        AffinityTopologyVersion topVer,
         IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
@@ -612,7 +613,7 @@ public interface GridCacheEntryEx<K, V> {
     @Nullable public V peek(boolean heap,
         boolean offheap,
         boolean swap,
-        long topVer,
+        AffinityTopologyVersion topVer,
         @Nullable IgniteCacheExpiryPolicy plc)
         throws GridCacheEntryRemovedException, IgniteCheckedException;
 
@@ -684,7 +685,8 @@ public interface GridCacheEntryEx<K, V> {
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
     public boolean initialValue(V val, @Nullable byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime,
-        boolean preload, long topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        boolean preload, AffinityTopologyVersion topVer, GridDrType drType)
+        throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * Sets new value if current version is <tt>0</tt> using swap entry data.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 382eb61..1ade451 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -347,10 +348,10 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
                 return;
             }
 
-            long topVer = lockTopology();
+            AffinityTopologyVersion topVer = lockTopology();
 
             try {
-                if (topVer != req.topologyVersion()) {
+                if (!topVer.equals(req.topologyVersion())) {
                     if (log.isDebugEnabled())
                         log.debug("Topology version is different [locTopVer=" + topVer +
                             ", rmtTopVer=" + req.topologyVersion() + ']');
@@ -498,7 +499,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
 
         if (!cctx.isNear()) {
             try {
-                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false);
+                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p,
+                    AffinityTopologyVersion.NONE, false);
 
                 assert part != null;
 
@@ -525,7 +527,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
 
         if (!cctx.isNear()) {
             try {
-                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false);
+                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE,
+                    false);
 
                 if (part != null && part.reserve()) {
                     part.lock();
@@ -561,7 +564,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
 
         if (!cctx.isNear()) {
             try {
-                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, -1, false);
+                GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE,
+                    false);
 
                 if (part != null) {
                     part.unlock();
@@ -582,14 +586,14 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
      *
      * @return Topology version after lock.
      */
-    private long lockTopology() {
+    private AffinityTopologyVersion lockTopology() {
         if (!cctx.isNear()) {
             cctx.dht().topology().readLock();
 
             return cctx.dht().topology().topologyVersion();
         }
 
-        return 0;
+        return AffinityTopologyVersion.ZERO;
     }
 
     /**
@@ -738,7 +742,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param e Entry for eviction policy notification.
      * @param topVer Topology version.
      */
-    public void touch(GridCacheEntryEx<K, V> e, long topVer) {
+    public void touch(GridCacheEntryEx<K, V> e, AffinityTopologyVersion topVer) {
         if (e.detached() || e.isInternal())
             return;
 
@@ -1092,7 +1096,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
                                 }
 
                                 try {
-                                    long topVer = lockTopology();
+                                    AffinityTopologyVersion topVer = lockTopology();
 
                                     try {
                                         onFutureCompleted((EvictionFuture)f, topVer);
@@ -1124,7 +1128,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
      * @param fut Completed eviction future.
      * @param topVer Topology version on future complete.
      */
-    private void onFutureCompleted(EvictionFuture fut, long topVer) {
+    private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) {
         if (!busyLock.enterBusy())
             return;
 
@@ -1151,7 +1155,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
             }
 
             // Check if topology version is different.
-            if (fut.topologyVersion() != topVer) {
+            if (!fut.topologyVersion().equals(topVer)) {
                 if (log.isDebugEnabled())
                     log.debug("Topology has changed, all entries will be touched: " + fut);
 
@@ -1264,7 +1268,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
      */
     @SuppressWarnings( {"IfMayBeConditional"})
     private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx<K, V> entry,
-        long topVer)
+        AffinityTopologyVersion topVer)
         throws GridCacheEntryRemovedException {
         assert entry != null;
 
@@ -1422,7 +1426,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
                         if (!evts.isEmpty())
                             break;
 
-                        if (!cctx.affinity().primary(loc, it.next(), evt.topologyVersion()))
+                        if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion())))
                             it.remove();
                     }
 
@@ -1434,7 +1438,8 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
                         if (!evts.isEmpty())
                             break;
 
-                        if (part.primary(evt.topologyVersion()) && primaryParts.add(part.id())) {
+                        if (part.primary(new AffinityTopologyVersion(evt.topologyVersion()))
+                            && primaryParts.add(part.id())) {
                             if (log.isDebugEnabled())
                                 log.debug("Touching partition entries: " + part);
 
@@ -1562,7 +1567,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
         private GridTimeoutObject timeoutObj;
 
         /** Topology version future is processed on. */
-        private long topVer;
+        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
         /**
          * @param ctx Context.
@@ -1793,7 +1798,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
         /**
          * @return Topology version.
          */
-        long topologyVersion() {
+        AffinityTopologyVersion topologyVersion() {
             return topVer;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java
index 676144d..8f32c33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -50,7 +52,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple
     private byte[] entriesBytes;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /**
      * Required by {@link Externalizable}.
@@ -65,10 +67,10 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple
      * @param size Size.
      * @param topVer Topology version.
      */
-    GridCacheEvictionRequest(int cacheId, long futId, int size, long topVer) {
+    GridCacheEvictionRequest(int cacheId, long futId, int size, @NotNull AffinityTopologyVersion topVer) {
         assert futId > 0;
         assert size > 0;
-        assert topVer > 0;
+        assert topVer.topologyVersion() > 0;
 
         this.cacheId = cacheId;
         this.futId = futId;
@@ -116,7 +118,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple
     /**
      * @return Topology version.
      */
-    @Override public long topologyVersion() {
+    @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -167,7 +169,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("topVer", topVer))
+                if (!topVer.writeTo(writer))
                     return false;
 
                 writer.incrementState();
@@ -205,7 +207,7 @@ public class GridCacheEvictionRequest<K, V> extends GridCacheMessage<K, V> imple
                 reader.incrementState();
 
             case 5:
-                topVer = reader.readLong("topVer");
+                topVer = AffinityTopologyVersion.readFrom(reader);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8cd7c4b..b504e21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -106,7 +106,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
             }
 
             long locTopVer = cctx.discovery().topologyVersion();
-            long rmtTopVer = cacheMsg.topologyVersion();
+            long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
 
             if (locTopVer < rmtTopVer) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index db7272b..ff337fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.eviction.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.extras.*;
 import org.apache.ignite.internal.processors.cache.query.*;
@@ -398,7 +399,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public boolean valid(long topVer) {
+    @Override public boolean valid(AffinityTopologyVersion topVer) {
         return true;
     }
 
@@ -976,7 +977,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         long ttl,
         boolean evt,
         boolean metrics,
-        long topVer,
+        AffinityTopologyVersion topVer,
         IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
         long drExpireTime,
@@ -1114,7 +1115,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         boolean retval,
         boolean evt,
         boolean metrics,
-        long topVer,
+        AffinityTopologyVersion topVer,
         IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
@@ -2728,7 +2729,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     @Nullable @Override public V peek(boolean heap,
         boolean offheap,
         boolean swap,
-        long topVer,
+        AffinityTopologyVersion topVer,
         @Nullable IgniteCacheExpiryPolicy expiryPlc)
         throws GridCacheEntryRemovedException, IgniteCheckedException
     {
@@ -2812,7 +2813,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException {
         assert tx == null || tx.local();
 
-        long topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion();
 
         switch (mode) {
             case TX:
@@ -2864,7 +2865,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         synchronized (this) {
             checkObsolete();
 
-            if (isNew() || !valid(-1))
+            if (isNew() || !valid(AffinityTopologyVersion.NONE))
                 unswap(true, true);
 
             if (deletedUnlocked())
@@ -2928,7 +2929,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         if (peek != null)
             return peek;
 
-        long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
 
         return peekGlobal(failFast, topVer, filter, null);
     }
@@ -2958,7 +2959,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
      */
     @SuppressWarnings({"RedundantTypeArguments"})
     @Nullable private GridTuple<V> peekGlobal(boolean failFast,
-        long topVer,
+        AffinityTopologyVersion topVer,
         IgnitePredicate<Cache.Entry<K, V>>[] filter,
         @Nullable IgniteCacheExpiryPolicy expiryPlc
         )
@@ -3147,7 +3148,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         long ttl,
         long expireTime,
         boolean preload,
-        long topVer,
+        AffinityTopologyVersion topVer,
         GridDrType drType)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
index ad7cf9e..73a0996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -34,6 +35,6 @@ public interface GridCacheMapEntryFactory<K, V> {
      * @param hdrId Header id.
      * @return New cache entry.
      */
-    public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val,
-        @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId);
+    public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
+        V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4362085a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 6b8689c..8165cc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -163,8 +164,8 @@ public abstract class GridCacheMessage<K, V> implements Message {
      *
      * @return Topology version.
      */
-    public long topologyVersion() {
-        return -1;
+    public AffinityTopologyVersion topologyVersion() {
+        return AffinityTopologyVersion.NONE;
     }
 
     /**


Mime
View raw message