ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [46/51] [abbrv] incubator-ignite git commit: Merge branch sprint-2 into ignite-45
Date Thu, 12 Mar 2015 05:36:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 02f799f,05500e3..23300be
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@@ -66,8 -65,15 +66,15 @@@ public class GridDhtCacheEntry extends 
       * @param ttl Time to live.
       * @param hdrId Header id.
       */
-     public GridDhtCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
-         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+     public GridDhtCacheEntry(GridCacheContext ctx,
 -        long topVer,
++        AffinityTopologyVersion topVer,
+         KeyCacheObject key,
+         int hash,
+         CacheObject val,
+         GridCacheMapEntry next,
+         long ttl,
+         int hdrId)
+     {
          super(ctx, key, hash, val, next, ttl, hdrId);
  
          // Record this entry with partition.
@@@ -151,10 -157,10 +158,10 @@@
       * @throws GridCacheEntryRemovedException If entry has been removed.
       * @throws GridDistributedLockCancelledException If lock was cancelled.
       */
-     @Nullable public GridCacheMvccCandidate<K> addDhtLocal(
+     @Nullable public GridCacheMvccCandidate addDhtLocal(
          UUID nearNodeId,
          GridCacheVersion nearVer,
 -        long topVer,
 +        AffinityTopologyVersion topVer,
          long threadId,
          GridCacheVersion ver,
          long timeout,
@@@ -300,26 -306,14 +307,14 @@@
       * @throws GridCacheEntryRemovedException If entry has been removed.
       */
      @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"})
-     @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(AffinityTopologyVersion topVer)
 -    @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(long topVer)
++    @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(AffinityTopologyVersion topVer)
          throws GridCacheEntryRemovedException {
 -        if (isNew() || !valid(-1) || deletedUnlocked())
 +        if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked())
              return null;
          else {
-             V val0 = null;
-             byte[] valBytes0 = null;
- 
-             GridCacheValueBytes valBytesTuple = valueBytesUnlocked();
+             CacheObject val0 = valueBytesUnlocked();
  
-             if (!valBytesTuple.isNull()) {
-                 if (valBytesTuple.isPlain())
-                     val0 = (V)valBytesTuple.get();
-                 else
-                     valBytes0 = valBytesTuple.get();
-             }
-             else
-                 val0 = val;
- 
-             return F.t(ver, val0, valBytes0);
+             return F.t(ver, val0);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 90dde96,b957a80..76ae5a1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@@ -74,13 -75,10 +76,10 @@@ public final class GridDhtGetFuture<K, 
      private GridCacheVersion ver;
  
      /** Topology version .*/
 -    private long topVer;
 +    private AffinityTopologyVersion topVer;
  
      /** Transaction. */
-     private IgniteTxLocalEx<K, V> tx;
- 
-     /** Logger. */
-     private IgniteLogger log;
+     private IgniteTxLocalEx tx;
  
      /** Retries because ownership changed. */
      private Collection<Integer> retries = new GridLeanSet<>();
@@@ -126,14 -113,13 +114,13 @@@
          GridCacheContext<K, V> cctx,
          long msgId,
          UUID reader,
-         LinkedHashMap<? extends K, Boolean> keys,
+         LinkedHashMap<KeyCacheObject, Boolean> keys,
          boolean readThrough,
          boolean reload,
-         @Nullable IgniteTxLocalEx<K, V> tx,
+         @Nullable IgniteTxLocalEx tx,
 -        long topVer,
 +        @NotNull AffinityTopologyVersion topVer,
          @Nullable UUID subjId,
          int taskNameHash,
-         boolean deserializePortable,
          @Nullable IgniteCacheExpiryPolicy expiryPlc,
          boolean skipVals
      ) {
@@@ -261,8 -238,8 +239,8 @@@
       * @param parts Parts to map.
       * @return {@code True} if mapped.
       */
-     private boolean map(K key, Collection<GridDhtLocalPartition> parts) {
+     private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) {
 -        GridDhtLocalPartition part = topVer > 0 ?
 +        GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
              cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
              cache().topology().localPartition(key, false);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4f28334,48d15aa..1e61223
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@@ -89,7 -88,7 +89,7 @@@ public interface GridDhtPartitionTopolo
       * @throws GridDhtInvalidPartitionException If partition is evicted or absent and
       *      does not belong to this node.
       */
-     @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create)
 -    @Nullable public GridDhtLocalPartition localPartition(int p, long topVer, boolean create)
++    @Nullable public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create)
          throws GridDhtInvalidPartitionException;
  
      /**
@@@ -160,7 -159,7 +160,7 @@@
       * @param e Entry added to cache.
       * @return Local partition.
       */
-     public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e);
 -    public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e);
++    public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e);
  
      /**
       * @param e Entry removed from cache.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 2c2911b,3ec1113..86cb805
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -249,11 -247,11 +249,11 @@@ class GridDhtPartitionTopologyImpl<K, V
              if (cctx.preloadEnabled()) {
                  for (int p = 0; p < num; p++) {
                      // If this is the first node in grid.
 -                    if (oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) {
 -                        assert exchId.isJoined();
 +                    if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) || exchId.isCacheAdded()) {
 +                        assert exchId.isJoined() || exchId.isCacheAdded();
  
                          try {
-                             GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false);
+                             GridDhtLocalPartition locPart = localPartition(p, topVer, true, false);
  
                              assert locPart != null;
  
@@@ -451,7 -449,7 +451,7 @@@
      }
  
      /** {@inheritDoc} */
-     @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create)
 -    @Nullable @Override public GridDhtLocalPartition localPartition(int p, long topVer, boolean create)
++    @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create)
          throws GridDhtInvalidPartitionException {
          return localPartition(p, topVer, create, true);
      }
@@@ -463,7 -461,7 +463,7 @@@
       * @param updateSeq Update sequence.
       * @return Local partition.
       */
-     private GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
 -    private GridDhtLocalPartition localPartition(int p, long topVer, boolean create, boolean updateSeq) {
++    private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
          while (true) {
              boolean belongs = cctx.affinity().localNode(p, topVer);
  
@@@ -513,8 -511,8 +513,8 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) {
+     @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
 -        return localPartition(cctx.affinity().partition(key), -1, create);
 +        return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
      }
  
      /** {@inheritDoc} */
@@@ -528,7 -526,7 +528,7 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) {
 -    @Override public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e) {
++    @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) {
          /*
           * Make sure not to acquire any locks here as this method
           * may be called from sensitive synchronization blocks.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d3fa4f9,a90e2e7..6498364
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@@ -499,8 -499,8 +500,8 @@@ public abstract class GridDhtTransactio
       * @param nodeId Node ID.
       * @param req Request.
       */
-     private void processNearLockRequest(UUID nodeId, GridNearLockRequest<K, V> req) {
+     private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) {
 -        assert isAffinityNode(cacheCfg);
 +        assert ctx.affinityNode();
          assert nodeId != null;
          assert req != null;
  
@@@ -1213,8 -1209,8 +1210,8 @@@
       * @param req Request.
       */
      @SuppressWarnings({"RedundantTypeArguments", "TypeMayBeWeakened"})
-     private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest<K, V> req) {
+     private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest req) {
 -        assert isAffinityNode(cacheCfg);
 +        assert ctx.affinityNode();
          assert nodeId != null;
  
          removeLocks(nodeId, req.version(), req.keys(), true);
@@@ -1230,11 -1226,11 +1227,11 @@@
       * @throws IgniteCheckedException If failed.
       */
      private void map(UUID nodeId,
 -        long topVer,
 +        AffinityTopologyVersion topVer,
-         GridCacheEntryEx<K,V> cached,
+         GridCacheEntryEx cached,
          Collection<UUID> readers,
-         Map<ClusterNode, List<T2<K, byte[]>>> dhtMap,
-         Map<ClusterNode, List<T2<K, byte[]>>> nearMap)
+         Map<ClusterNode, List<KeyCacheObject>> dhtMap,
+         Map<ClusterNode, List<KeyCacheObject>> nearMap)
          throws IgniteCheckedException {
          Collection<ClusterNode> dhtNodes = ctx.dht().topology().nodes(cached.partition(), topVer);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 6d6446f,a13233f..82979fc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -18,7 -18,7 +18,8 @@@
  package org.apache.ignite.internal.processors.cache.distributed.dht;
  
  import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.managers.communication.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
  import org.apache.ignite.internal.processors.cache.version.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 9f2ca20,57795d3..00e8e3e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@@ -20,7 -20,7 +20,8 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.*;
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.cluster.*;
 +import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.managers.communication.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@@ -214,7 -218,7 +219,7 @@@ public class GridDhtTxLocal extends Gri
      }
  
      /** {@inheritDoc} */
-     @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) {
 -    @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) {
++    @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, AffinityTopologyVersion topVer) {
          return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId());
      }
  
@@@ -247,11 -251,11 +252,11 @@@
      }
  
      /** {@inheritDoc} */
-     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached,
-         IgniteTxEntry<K, V> entry, AffinityTopologyVersion topVer) {
+     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached,
 -        IgniteTxEntry entry, long topVer) {
++        IgniteTxEntry entry, AffinityTopologyVersion topVer) {
          // Don't add local node as reader.
          if (!cctx.localNodeId().equals(nearNodeId)) {
-             GridCacheContext<K, V> cacheCtx = cached.context();
+             GridCacheContext cacheCtx = cached.context();
  
              while (true) {
                  try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index e856aa8,b493dd6..ab5286f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -20,7 -20,7 +20,8 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.*;
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.managers.communication.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
@@@ -141,9 -141,9 +142,9 @@@ public abstract class GridDhtTxLocalAda
       * @return {@code True} if reader was added as a result of this call.
       */
      @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId,
-         GridDhtCacheEntry<K, V> cached,
-         IgniteTxEntry<K, V> entry,
+         GridDhtCacheEntry cached,
+         IgniteTxEntry entry,
 -        long topVer);
 +        AffinityTopologyVersion topVer);
  
      /**
       * @param commit Commit flag.
@@@ -540,11 -526,11 +527,11 @@@
          onePhaseCommit(onePhaseCommit);
  
          try {
-             Set<K> skipped = null;
+             Set<KeyCacheObject> skipped = null;
  
 -            long topVer = topologyVersion();
 +            AffinityTopologyVersion topVer = topologyVersion();
  
-             GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+             GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
  
              // Enlist locks into transaction.
              for (int i = 0; i < entries.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index c913137,1ffe82e..3ba146f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@@ -113,10 -113,10 +114,10 @@@ public class GridDhtTxPrepareRequest ex
      public GridDhtTxPrepareRequest(
          IgniteUuid futId,
          IgniteUuid miniId,
-         @NotNull AffinityTopologyVersion topVer,
-         GridDhtTxLocalAdapter<K, V> tx,
-         Collection<IgniteTxEntry<K, V>> dhtWrites,
-         Collection<IgniteTxEntry<K, V>> nearWrites,
 -        long topVer,
++        AffinityTopologyVersion topVer,
+         GridDhtTxLocalAdapter tx,
+         Collection<IgniteTxEntry> dhtWrites,
+         Collection<IgniteTxEntry> nearWrites,
          IgniteTxKey grpLockKey,
          boolean partLock,
          Map<UUID, Collection<UUID>> txNodes,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index b2c2300,fe8b91f..e019f6e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@@ -18,7 -18,7 +18,8 @@@
  package org.apache.ignite.internal.processors.cache.distributed.dht;
  
  import org.apache.ignite.*;
+ import org.apache.ignite.internal.managers.communication.*;
 +import org.apache.ignite.internal.processors.affinity.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
@@@ -220,7 -222,7 +223,7 @@@ public class GridDhtTxRemote extends Gr
      }
  
      /** {@inheritDoc} */
-     @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, AffinityTopologyVersion topVer) {
 -    @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) {
++    @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, AffinityTopologyVersion topVer) {
          if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId))
              return false;
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
index 92d750a,a818500..7bcc5c3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
@@@ -71,15 -70,19 +71,20 @@@ public class GridNoStorageCacheMap exte
      }
  
      /** {@inheritDoc} */
-     @Override public GridCacheMapEntry<K, V> putEntry(AffinityTopologyVersion topVer, K key, @Nullable V val, long ttl) {
 -    @Override public GridCacheMapEntry putEntry(long topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) {
++    @Override public GridCacheMapEntry putEntry(AffinityTopologyVersion topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) {
          throw new AssertionError();
      }
  
      /** {@inheritDoc} */
-     @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, K key, @Nullable V val,
-         long ttl, boolean create) {
 -    @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(long topVer,
++    @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(
++        AffinityTopologyVersion topVer,
+         KeyCacheObject key,
+         @Nullable CacheObject val,
+         long ttl,
+         boolean create)
+     {
          if (create) {
-             GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val,
+             GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val,
                  null, 0, 0);
  
              return new GridTriple<>(entry, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index fb51c91,bdbcd06..c64dc4a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@@ -61,10 -62,10 +63,10 @@@ public class GridPartitionedGetFuture<K
      private GridCacheContext<K, V> cctx;
  
      /** Keys. */
-     private Collection<? extends K> keys;
+     private Collection<KeyCacheObject> keys;
  
      /** Topology version. */
 -    private long topVer;
 +    private AffinityTopologyVersion topVer;
  
      /** Reload flag. */
      private boolean reload;
@@@ -128,8 -119,8 +120,8 @@@
       */
      public GridPartitionedGetFuture(
          GridCacheContext<K, V> cctx,
-         Collection<? extends K> keys,
-         @NotNull AffinityTopologyVersion topVer,
+         Collection<KeyCacheObject> keys,
 -        long topVer,
++        AffinityTopologyVersion topVer,
          boolean readThrough,
          boolean reload,
          boolean forcePrimary,
@@@ -166,9 -158,9 +159,9 @@@
       * Initializes future.
       */
      public void init() {
 -        long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
 +        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
  
-         map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer);
+         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
  
          markInitialized();
      }
@@@ -275,9 -260,13 +261,14 @@@
       * @param mapped Mappings to check for duplicates.
       * @param topVer Topology version on which keys should be mapped.
       */
-     private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, AffinityTopologyVersion topVer) {
 -    private void map(Collection<KeyCacheObject> keys,
++    private void map(
++        Collection<KeyCacheObject> keys,
+         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
 -        long topVer)
 -    {
++        AffinityTopologyVersion topVer
++    ) {
          if (CU.affinityNodes(cctx, topVer).isEmpty()) {
-             onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid)."));
+             onDone(new ClusterTopologyCheckedException("Failed to map keys for cache " +
+                 "(all partition nodes left the grid)."));
  
              return;
          }
@@@ -416,8 -396,10 +398,13 @@@
       * @return {@code True} if has remote nodes.
       */
      @SuppressWarnings("ConstantConditions")
-     private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals,
-         AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) {
 -    private boolean map(KeyCacheObject key,
 -        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, Map<K, V> locVals,
 -        long topVer,
 -        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped) {
++    private boolean map(
++        KeyCacheObject key,
++        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, 
++        Map<K, V> locVals,
++        AffinityTopologyVersion topVer,
++        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
++    ) {
          GridDhtCacheAdapter<K, V> colocated = cache();
  
          boolean remote = false;
@@@ -587,26 -545,17 +550,17 @@@
  
          /** Keys. */
          @GridToStringInclude
-         private LinkedHashMap<K, Boolean> keys;
+         private LinkedHashMap<KeyCacheObject, Boolean> keys;
  
          /** Topology version on which this future was mapped. */
 -        private long topVer;
 +        private AffinityTopologyVersion topVer;
  
          /**
-          * Empty constructor required for {@link Externalizable}.
-          */
-         public MiniFuture() {
-             // No-op.
-         }
- 
-         /**
           * @param node Node.
           * @param keys Keys.
           * @param topVer Topology version.
           */
-         MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, @NotNull AffinityTopologyVersion topVer) {
-             super(cctx.kernalContext());
- 
 -        MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, long topVer) {
++        MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, AffinityTopologyVersion topVer) {
              this.node = node;
              this.keys = keys;
              this.topVer = topVer;
@@@ -652,9 -601,9 +606,9 @@@
              if (log.isDebugEnabled())
                  log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
  
-             AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
 -            long updTopVer = cctx.discovery().topologyVersion();
++            AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
  
 -            assert updTopVer > topVer : "Got topology exception but topology version did " +
 +            assert updTopVer.compareTo(topVer) > 0 : "Got topology exception but topology version did " +
                  "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
                  ", nodeId=" + node.id() + ']';
  
@@@ -698,16 -647,16 +652,16 @@@
                      log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
  
                  // Need to wait for next topology version to remap.
-                 IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
 -                IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer);
++                IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
  
-                 topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() {
+                 topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
                      @SuppressWarnings("unchecked")
                      @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException {
 -                        long topVer = fut.get();
 +                        AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
  
                          // This will append new futures to compound list.
-                         map(F.view(keys.keySet(),  new P1<K>() {
-                             @Override public boolean apply(K key) {
+                         map(F.view(keys.keySet(),  new P1<KeyCacheObject>() {
+                             @Override public boolean apply(KeyCacheObject key) {
                                  return invalidParts.contains(cctx.affinity().partition(key));
                              }
                          }), F.t(node, keys), topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ddf4229,4474432..c8ff3e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -118,11 -117,18 +118,18 @@@ public class GridDhtAtomicCache<K, V> e
  
      /** {@inheritDoc} */
      @Override protected void init() {
-         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+         map.setEntryFactory(new GridCacheMapEntryFactory() {
              /** {@inheritDoc} */
-             @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
-                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
-                 return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
+             @Override public GridCacheMapEntry create(GridCacheContext ctx,
 -                long topVer,
++                AffinityTopologyVersion topVer,
+                 KeyCacheObject key,
+                 int hash,
+                 CacheObject val,
+                 GridCacheMapEntry next,
+                 long ttl,
+                 int hdrId)
+             {
+                 return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId);
              }
          });
  
@@@ -439,7 -446,8 +445,7 @@@
              null,
              true,
              true,
-             ctx.equalsPeekArray(oldVal));
 -            null,
+             ctx.equalsValArray(oldVal));
      }
  
      /** {@inheritDoc} */
@@@ -458,7 -466,8 +464,7 @@@
              null,
              false,
              false,
-             filter);
 -            null,
+             filter).chain(RET2NULL);
      }
  
      /** {@inheritDoc} */
@@@ -662,8 -669,9 +665,8 @@@
              args,
              null,
              null,
-             true,
+             false,
              false,
 -            null,
              null);
  
          return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@@@ -705,8 -713,9 +708,8 @@@
              args,
              null,
              null,
-             true,
+             false,
              false,
 -            null,
              null);
      }
  
@@@ -734,8 -743,9 +737,8 @@@
              args,
              null,
              null,
-             true,
+             false,
              false,
 -            null,
              null);
      }
  
@@@ -757,11 -768,12 +760,11 @@@
          @Nullable final Map<? extends K, ? extends V> map,
          @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
          @Nullable Object[] invokeArgs,
-         @Nullable final Map<? extends K, GridCacheDrInfo<V>> conflictPutMap,
-         @Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap,
+         @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+         @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
          final boolean retval,
          final boolean rawRetval,
-         @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
 -        @Nullable GridCacheEntryEx cached,
+         @Nullable final CacheEntryPredicate[] filter
      ) {
          if (map != null && keyCheck)
              validateCacheKeys(map.keySet());
@@@ -885,15 -897,7 +888,7 @@@
          boolean deserializePortable,
          @Nullable ExpiryPolicy expiryPlc,
          boolean skipVals) {
-         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
- 
-         if (F.isEmpty(keys))
-             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
- 
-         if (keyCheck)
-             validateCacheKeys(keys);
- 
 -        long topVer = ctx.affinity().affinityTopologyVersion();
 +        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
  
          final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
  
@@@ -1622,14 -1640,14 +1631,14 @@@
          String taskName,
          @Nullable IgniteCacheExpiryPolicy expiry
      ) throws GridCacheEntryRemovedException {
-         GridCacheReturn<Object> retVal = null;
-         Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null;
+         GridCacheReturn retVal = null;
+         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
  
-         List<K> keys = req.keys();
+         List<KeyCacheObject> keys = req.keys();
  
 -        long topVer = req.topologyVersion();
 +        AffinityTopologyVersion topVer = req.topologyVersion();
  
 -        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
 +        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer.topologyVersion());
  
          boolean readersOnly = false;
  
@@@ -2078,10 -2086,10 +2077,10 @@@
       *      locks are released.
       */
      @SuppressWarnings("ForLoopReplaceableByForEach")
-     private List<GridDhtCacheEntry<K, V>> lockEntries(List<K> keys, AffinityTopologyVersion topVer)
 -    private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, long topVer)
++    private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer)
          throws GridDhtInvalidPartitionException {
          if (keys.size() == 1) {
-             K key = keys.get(0);
+             KeyCacheObject key = keys.get(0);
  
              while (true) {
                  try {
@@@ -2161,7 -2169,7 +2160,7 @@@
       * @param locked Locked entries.
       * @param topVer Topology version.
       */
-     private void unlockEntries(Collection<GridDhtCacheEntry<K, V>> locked, AffinityTopologyVersion topVer) {
 -    private void unlockEntries(Collection<GridDhtCacheEntry> locked, long topVer) {
++    private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) {
          // Process deleted entries before locks release.
          assert ctx.deferredDelete();
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
index ec63130,76f8a40..c918e1e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
@@@ -36,8 -35,15 +36,15 @@@ public class GridDhtAtomicCacheEntry ex
       * @param ttl Time to live.
       * @param hdrId Header id.
       */
-     public GridDhtAtomicCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
-         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+     public GridDhtAtomicCacheEntry(GridCacheContext ctx,
 -        long topVer,
++        AffinityTopologyVersion topVer,
+         KeyCacheObject key,
+         int hash,
+         CacheObject val,
+         GridCacheMapEntry next,
+         long ttl,
+         int hdrId)
+     {
          super(ctx, topVer, key, hash, val, next, ttl, hdrId);
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5f8240d,f72665f..6dea845
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@@ -286,13 -271,13 +272,13 @@@ public class GridDhtAtomicUpdateFuture 
  
          keys.add(entry.key());
  
 -        long topVer = updateReq.topologyVersion();
 +        AffinityTopologyVersion topVer = updateReq.topologyVersion();
  
          for (UUID nodeId : readers) {
-             GridDhtAtomicUpdateRequest<K, V> updateReq = mappings.get(nodeId);
+             GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
  
              if (updateReq == null) {
-                 ClusterNode node = ctx.discovery().node(nodeId);
+                 ClusterNode node = cctx.discovery().node(nodeId);
  
                  // Node left the grid.
                  if (node == null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3cd4fea,ebf26b2..77e07a4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@@ -110,10 -105,10 +105,10 @@@ public class GridNearAtomicUpdateFutur
      private final ExpiryPolicy expiryPlc;
  
      /** Future map topology version. */
 -    private long topVer;
 +    private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
  
      /** Optional filter. */
-     private final IgnitePredicate<Cache.Entry<K, V>>[] filter;
+     private final CacheEntryPredicate[] filter;
  
      /** Write synchronization mode. */
      private final CacheWriteSynchronizationMode syncMode;
@@@ -420,10 -408,10 +408,10 @@@
       * @param remap Boolean flag indicating if this is partial future remap.
       * @param oldNodeId Old node ID if remap.
       */
-     private void mapOnTopology(final Collection<? extends K> keys, final boolean remap, final UUID oldNodeId) {
+     private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
          cache.topology().readLock();
  
 -        GridDiscoveryTopologySnapshot snapshot = null;
 +        AffinityTopologyVersion topVer = null;
  
          try {
              GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
@@@ -435,10 -421,12 +423,10 @@@
                      // Assign future version in topology read lock before first exception may be thrown.
                      futVer = cctx.versions().next(topVer);
  
 -                // We are holding topology read lock and current topology is ready, we can start mapping.
 -                snapshot = fut.topologySnapshot();
              }
              else {
-                 fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 -                fut.listen(new CI1<IgniteInternalFuture<Long>>() {
 -                    @Override public void apply(IgniteInternalFuture<Long> t) {
++                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                          mapOnTopology(keys, remap, oldNodeId);
                      }
                  });
@@@ -477,9 -473,8 +465,9 @@@
       * @param remap Flag indicating if this is partial remap for this future.
       * @param oldNodeId Old node ID if was remap.
       */
 -    private void map0(GridDiscoveryTopologySnapshot topSnapshot,
 +    private void map0(
 +        AffinityTopologyVersion topVer,
-         Collection<? extends K> keys,
+         Collection<?> keys,
          boolean remap,
          @Nullable UUID oldNodeId) {
          assert oldNodeId == null || remap;
@@@ -746,8 -744,8 +735,12 @@@
       * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
       * @return Collection of nodes to which key is mapped.
       */
-     private Collection<ClusterNode> mapKey(K key, AffinityTopologyVersion topVer, boolean fastMap) {
-         GridCacheAffinityManager<K, V> affMgr = cctx.affinity();
 -    private Collection<ClusterNode> mapKey(KeyCacheObject key, long topVer, boolean fastMap) {
++    private Collection<ClusterNode> mapKey(
++        KeyCacheObject key,
++        AffinityTopologyVersion topVer, 
++        boolean fastMap
++    ) {
+         GridCacheAffinityManager affMgr = cctx.affinity();
  
          // If we can send updates in parallel - do it.
          return fastMap ?

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index a478876,01ccb53..ee0439f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -81,11 -79,18 +80,18 @@@ public class GridDhtColocatedCache<K, V
  
      /** {@inheritDoc} */
      @Override protected void init() {
-         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+         map.setEntryFactory(new GridCacheMapEntryFactory() {
              /** {@inheritDoc} */
-             @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
-                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
-                 return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
+             @Override public GridCacheMapEntry create(GridCacheContext ctx,
 -                long topVer,
++                AffinityTopologyVersion topVer,
+                 KeyCacheObject key,
+                 int hash,
+                 CacheObject val,
+                 GridCacheMapEntry next,
+                 long ttl,
+                 int hdrId)
+             {
+                 return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId);
              }
          });
      }
@@@ -119,9 -124,12 +125,13 @@@
       * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary
       *      for given key.
       */
-     public GridDistributedCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached) {
 -    public GridDistributedCacheEntry entryExx(KeyCacheObject key,
 -        long topVer,
 -        boolean allowDetached)
 -    {
++    public GridDistributedCacheEntry entryExx(
++        KeyCacheObject key,
++        AffinityTopologyVersion topVer,
++        boolean allowDetached
++    ) {
          return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ?
-             new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer);
+             new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer);
      }
  
      /** {@inheritDoc} */
@@@ -200,7 -220,7 +222,10 @@@
      }
  
      /** {@inheritDoc} */
-     @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, AffinityTopologyVersion topVer) {
 -    @Override protected GridCacheEntryEx entryExSafe(KeyCacheObject key, long topVer) {
++    @Override protected GridCacheEntryEx entryExSafe(
++        KeyCacheObject key,
++        AffinityTopologyVersion topVer
++    ) {
          try {
              return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null;
          }
@@@ -410,27 -421,28 +426,28 @@@
  
              int keyCnt = -1;
  
-             Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+             Map<ClusterNode, GridNearUnlockRequest> map = null;
  
-             Collection<K> locKeys = new ArrayList<>();
+             Collection<KeyCacheObject> locKeys = new ArrayList<>();
  
              for (K key : keys) {
-                 GridDistributedCacheEntry<K, V> entry = peekExx(key);
+                 KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
  
-                 Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.wrapLazyValue();
+                 GridDistributedCacheEntry entry = peekExx(cacheKey);
  
-                 if (!ctx.isAll(Entry, filter))
+                 if (!ctx.isAll(entry, filter))
                      break; // While.
  
-                 GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null);
+                 GridCacheMvccCandidate lock =
+                     ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), cacheKey, null);
  
                  if (lock != null) {
 -                    final long topVer = lock.topologyVersion();
 +                    final AffinityTopologyVersion topVer = lock.topologyVersion();
  
 -                    assert topVer > 0;
 +                    assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
  
                      if (map == null) {
 -                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
 +                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion());
  
                          keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
  
@@@ -511,18 -523,18 +528,18 @@@
          try {
              int keyCnt = -1;
  
-             Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+             Map<ClusterNode, GridNearUnlockRequest> map = null;
  
-             Collection<K> locKeys = new LinkedList<>();
+             Collection<KeyCacheObject> locKeys = new LinkedList<>();
  
-             for (K key : keys) {
-                 GridCacheMvccCandidate<K> lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
+             for (KeyCacheObject key : keys) {
+                 GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
  
                  if (lock != null) {
 -                    long topVer = lock.topologyVersion();
 +                    AffinityTopologyVersion topVer = lock.topologyVersion();
  
                      if (map == null) {
 -                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
 +                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer.topologyVersion());
  
                          keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
  
@@@ -594,11 -606,11 +611,11 @@@
       */
      IgniteInternalFuture<Exception> lockAllAsync(
          final GridCacheContext<K, V> cacheCtx,
-         @Nullable final GridNearTxLocal<K, V> tx,
+         @Nullable final GridNearTxLocal tx,
          final long threadId,
          final GridCacheVersion ver,
-         final AffinityTopologyVersion topVer,
-         final Collection<K> keys,
 -        final long topVer,
++        AffinityTopologyVersion topVer,
+         final Collection<KeyCacheObject> keys,
          final boolean txRead,
          final long timeout,
          final long accessTtl,
@@@ -667,11 -679,11 +684,11 @@@
       */
      private IgniteInternalFuture<Exception> lockAllAsync0(
          GridCacheContext<K, V> cacheCtx,
-         @Nullable final GridNearTxLocal<K, V> tx,
+         @Nullable final GridNearTxLocal tx,
          long threadId,
          final GridCacheVersion ver,
-         final AffinityTopologyVersion topVer,
-         final Collection<K> keys,
 -        final long topVer,
++        AffinityTopologyVersion topVer,
+         final Collection<KeyCacheObject> keys,
          final boolean txRead,
          final long timeout,
          final long accessTtl,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
index f2de9d9,c95e2e2..1ccc0da
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
@@@ -36,8 -35,15 +36,15 @@@ public class GridDhtColocatedCacheEntr
       * @param ttl Time to live.
       * @param hdrId Header id.
       */
-     public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, V val,
-         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+     public GridDhtColocatedCacheEntry(GridCacheContext ctx,
 -        long topVer,
++        AffinityTopologyVersion topVer,
+         KeyCacheObject key,
+         int hash,
+         CacheObject val,
+         GridCacheMapEntry next,
+         long ttl,
 -        int hdrId)
 -    {
++        int hdrId
++    ) {
          super(ctx, topVer, key, hash, val, next, ttl, hdrId);
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 1e13dd1,08358d5..cb889f2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -574,8 -563,8 +564,8 @@@ public final class GridDhtColocatedLock
                      markInitialized();
                  }
                  else {
-                     fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 -                    fut.listen(new CI1<IgniteInternalFuture<Long>>() {
 -                        @Override public void apply(IgniteInternalFuture<Long> t) {
++                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                              mapOnTopology();
                          }
                      });
@@@ -871,8 -859,8 +860,11 @@@
       * @param topVer Topology version to lock on.
       * @param mappings Optional collection of mappings to proceed locking.
       */
-     private void lockLocally(final Collection<K> keys, AffinityTopologyVersion topVer,
-         @Nullable final Deque<GridNearLockMapping<K, V>> mappings) {
 -    private void lockLocally(final Collection<KeyCacheObject> keys, long topVer,
 -        @Nullable final Deque<GridNearLockMapping> mappings) {
++    private void lockLocally(
++        final Collection<KeyCacheObject> keys,
++        AffinityTopologyVersion topVer,
++        @Nullable final Deque<GridNearLockMapping> mappings
++    ) {
          if (log.isDebugEnabled())
              log.debug("Before locally locking keys : " + keys);
  
@@@ -948,14 -934,14 +938,14 @@@
       * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed.
       * @throws IgniteCheckedException If key cannot be added to mapping.
       */
-     private boolean mapAsPrimary(Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
 -    private boolean mapAsPrimary(Collection<KeyCacheObject> keys, long topVer) throws IgniteCheckedException {
++    private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
          // Assign keys to primary nodes.
-         Collection<K> distributedKeys = new ArrayList<>(keys.size());
+         Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size());
  
-         for (K key : keys) {
+         for (KeyCacheObject key : keys) {
              if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                  // Remove explicit locks added so far.
-                 for (K k : keys)
+                 for (KeyCacheObject k : keys)
                      cctx.mvcc().removeExplicitLock(threadId, k, lockVer);
  
                  return false;
@@@ -993,8 -979,9 +983,12 @@@
       * @return {@code True} if transaction accesses key that was explicitly locked before.
       * @throws IgniteCheckedException If lock is externally held and transaction is explicit.
       */
-     private boolean addLocalKey(K key, AffinityTopologyVersion topVer, Collection<K> distributedKeys) throws IgniteCheckedException {
-         GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false);
 -    private boolean addLocalKey(KeyCacheObject key, long topVer, Collection<KeyCacheObject> distributedKeys)
 -        throws IgniteCheckedException {
++    private boolean addLocalKey(
++        KeyCacheObject key,
++        AffinityTopologyVersion topVer, 
++        Collection<KeyCacheObject> distributedKeys
++    ) throws IgniteCheckedException {
+         GridDistributedCacheEntry entry = cctx.colocated().entryExx(key, topVer, false);
  
          assert !entry.detached();
  
@@@ -1022,8 -1009,8 +1016,11 @@@
       * @return Near lock mapping.
       * @throws IgniteCheckedException If mapping failed.
       */
-     private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping,
-         AffinityTopologyVersion topVer) throws IgniteCheckedException {
 -    private GridNearLockMapping map(KeyCacheObject key, @Nullable GridNearLockMapping mapping,
 -        long topVer) throws IgniteCheckedException {
++    private GridNearLockMapping map(
++        KeyCacheObject key, 
++        @Nullable GridNearLockMapping mapping,
++        AffinityTopologyVersion topVer
++    ) throws IgniteCheckedException {
          assert mapping == null || mapping.node() != null;
  
          ClusterNode primary = cctx.affinity().primary(key, topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index dbf6146,bec3bf0..d53f445
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@@ -92,11 -90,13 +91,13 @@@ public final class GridDhtForceKeysFutu
       * @param keys Keys.
       * @param preloader Preloader.
       */
-     public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, @NotNull AffinityTopologyVersion topVer,
-         Collection<? extends K> keys, GridDhtPreloader<K, V> preloader) {
-         super(cctx.kernalContext());
- 
+     public GridDhtForceKeysFuture(
+         GridCacheContext<K, V> cctx,
 -        long topVer,
++        AffinityTopologyVersion topVer,
+         Collection<KeyCacheObject> keys,
+         GridDhtPreloader<K, V> preloader
+     ) {
 -        assert topVer != 0 : topVer;
 +        assert topVer.topologyVersion() != 0 : topVer;
          assert !F.isEmpty(keys) : keys;
  
          this.cctx = cctx;
@@@ -493,13 -476,13 +477,13 @@@
  
              boolean replicate = cctx.isDrEnabled();
  
-             for (GridCacheEntryInfo<K, V> info : res.forcedInfos()) {
+             for (GridCacheEntryInfo info : res.forcedInfos()) {
                  int p = cctx.affinity().partition(info.key());
  
-                 GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
 -                GridDhtLocalPartition locPart = top.localPartition(p, -1, false);
++                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
  
                  if (locPart != null && locPart.state() == MOVING && locPart.reserve()) {
-                     GridCacheEntryEx<K, V> entry = cctx.dht().entryEx(info.key());
+                     GridCacheEntryEx entry = cctx.dht().entryEx(info.key());
  
                      try {
                          if (entry.initialValue(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index fa6cd29,925a05d..29823f9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@@ -46,17 -44,13 +46,13 @@@ public class GridDhtForceKeysRequest ex
      /** Mini-future ID. */
      private IgniteUuid miniId;
  
-     /** Serialized keys. */
-     @GridDirectCollection(byte[].class)
-     private Collection<byte[]> keyBytes;
- 
      /** Keys to request. */
      @GridToStringInclude
-     @GridDirectTransient
-     private Collection<K> keys;
+     @GridDirectCollection(KeyCacheObject.class)
+     private Collection<KeyCacheObject> keys;
  
      /** Topology version for which keys are requested. */
 -    private long topVer;
 +    private AffinityTopologyVersion topVer;
  
      /**
       * @param cacheId Cache ID.
@@@ -69,8 -63,8 +65,8 @@@
          int cacheId,
          IgniteUuid futId,
          IgniteUuid miniId,
-         Collection<K> keys,
-         @NotNull AffinityTopologyVersion topVer
+         Collection<KeyCacheObject> keys,
 -        long topVer
++        AffinityTopologyVersion topVer
      ) {
          assert futId != null;
          assert miniId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 8f971a4,149929d..144ed7a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@@ -209,8 -208,8 +209,8 @@@ public class GridDhtPartitionDemandPool
              if (log.isDebugEnabled())
                  log.debug("Forcing preload event for future: " + exchFut);
  
-             exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 -            exchFut.listen(new CI1<IgniteInternalFuture<Long>>() {
 -                @Override public void apply(IgniteInternalFuture<Long> t) {
++            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                      cctx.shared().exchange().forcePreloadExchange(exchFut);
                  }
              });
@@@ -358,8 -357,8 +358,8 @@@
  
              obj = new GridTimeoutObjectAdapter(delay) {
                  @Override public void onTimeout() {
-                     exchFut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 -                    exchFut.listen(new CI1<IgniteInternalFuture<Long>>() {
 -                        @Override public void apply(IgniteInternalFuture<Long> f) {
++                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
                              cctx.shared().exchange().forcePreloadExchange(exchFut);
                          }
                      });
@@@ -482,10 -481,10 +482,14 @@@
           * @return {@code False} if partition has become invalid during preloading.
           * @throws IgniteInterruptedCheckedException If interrupted.
           */
-         private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, AffinityTopologyVersion topVer)
 -        private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo entry, long topVer)
--            throws IgniteCheckedException {
++        private boolean preloadEntry(
++            ClusterNode pick, 
++            int p, 
++            GridCacheEntryInfo entry, 
++            AffinityTopologyVersion topVer
++        ) throws IgniteCheckedException {
              try {
-                 GridCacheEntryEx<K, V> cached = null;
+                 GridCacheEntryEx cached = null;
  
                  try {
                      cached = cctx.dht().entryEx(entry.key());
@@@ -570,9 -568,9 +573,13 @@@
           * @throws ClusterTopologyCheckedException If node left.
           * @throws IgniteCheckedException If failed to send message.
           */
-         private Set<Integer> demandFromNode(ClusterNode node, final AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage<K, V> d,
-             GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException {
-             GridDhtPartitionTopology<K, V> top = cctx.dht().topology();
 -        private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage d,
 -            GridDhtPartitionsExchangeFuture exchFut) throws InterruptedException, IgniteCheckedException {
++        private Set<Integer> demandFromNode(
++            ClusterNode node, 
++            final AffinityTopologyVersion topVer, 
++            GridDhtPartitionDemandMessage d,
++            GridDhtPartitionsExchangeFuture exchFut
++        ) throws InterruptedException, IgniteCheckedException {
+             GridDhtPartitionTopology top = cctx.dht().topology();
  
              cntr++;
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 236ec6c,cdd153f..43f5566
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -48,8 -46,8 +47,8 @@@ import static org.apache.ignite.interna
  /**
   * Future for exchanging partition maps.
   */
- public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<AffinityTopologyVersion>
-     implements Comparable<GridDhtPartitionsExchangeFuture<K, V>>, GridDhtTopologyFuture {
 -public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<Long>
++public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
+     implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
      /** */
      private static final long serialVersionUID = 0L;
  
@@@ -200,16 -193,8 +197,12 @@@
       * @param busyLock Busy lock.
       * @param exchId Exchange ID.
       */
 -    public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, ReadWriteLock busyLock,
 -        GridDhtPartitionExchangeId exchId) {
 +    public GridDhtPartitionsExchangeFuture(
-         GridCacheSharedContext<K, V> cctx,
++        GridCacheSharedContext cctx,
 +        ReadWriteLock busyLock,
 +        GridDhtPartitionExchangeId exchId,
 +        Collection<DynamicCacheChangeRequest> reqs
 +    ) {
-         super(cctx.kernalContext());
- 
-         syncNotify(true);
- 
          assert busyLock != null;
          assert exchId != null;
  
@@@ -462,9 -423,9 +441,9 @@@
                      // If received any messages, process them.
                      onReceive(m.getKey(), m.getValue());
  
 -                long topVer = exchId.topologyVersion();
 +                AffinityTopologyVersion topVer = exchId.topologyVersion();
  
-                 for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
+                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                      if (cacheCtx.isLocal())
                          continue;
  
@@@ -487,10 -448,7 +466,10 @@@
                  if (log.isDebugEnabled())
                      log.debug("After waiting for partition release future: " + this);
  
 +                if (!F.isEmpty(reqs))
 +                    stopCaches();
 +
-                 for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
+                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                      if (cacheCtx.isLocal())
                          continue;
  
@@@ -504,9 -462,9 +483,9 @@@
                      // Process queued undeploys prior to sending/spreading map.
                      cacheCtx.preloader().unwindUndeploys();
  
-                     GridDhtPartitionTopology<K, V> top = cacheCtx.topology();
+                     GridDhtPartitionTopology top = cacheCtx.topology();
  
 -                    assert topVer == top.topologyVersion() :
 +                    assert topVer.equals(top.topologyVersion()) :
                          "Topology version is updated only in this class instances inside single ExchangeWorker thread.";
  
                      top.beforeExchange(exchId);
@@@ -680,24 -612,11 +660,24 @@@
      }
  
      /** {@inheritDoc} */
 -    @Override public boolean onDone(Long res, Throwable err) {
 -        if (err == null) {
 -            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
 +    @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
-         for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
++        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
 +            if (err == null) {
                  if (!cacheCtx.isLocal())
 -                    cacheCtx.affinity().cleanUpCache(res - 10);
 +                    cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10);
 +            }
 +
 +            if (!F.isEmpty(reqs)) {
 +                for (DynamicCacheChangeRequest req : reqs) {
 +                    if (F.eq(cacheCtx.name(), req.cacheName())) {
 +                        if (req.isStart())
 +                            cacheCtx.preloader().onInitialExchangeComplete(err);
 +                        else if (req.isClientStart()) {
-                             if (req.clientNodeId().equals(ctx.localNodeId()))
++                            if (req.clientNodeId().equals(cacheCtx.localNodeId()))
 +                                cacheCtx.preloader().onInitialExchangeComplete(err);
 +                        }
 +                    }
 +                }
              }
          }
  
@@@ -893,9 -807,9 +873,9 @@@
          if (log.isDebugEnabled())
              log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
  
 -        assert exchId.topologyVersion() == msg.topologyVersion();
 +        assert exchId.topologyVersion().equals(msg.topologyVersion());
  
-         initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() {
+         initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
              @Override public void apply(IgniteInternalFuture<Boolean> t) {
                  assert msg.lastVersion() != null;
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0a74d0c,c171f97..c698f6f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@@ -71,7 -71,7 +72,7 @@@ public class GridDhtPreloader<K, V> ext
      private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
  
      /** Pending affinity assignment futures. */
-     private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts =
 -    private ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
++    private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
          new ConcurrentHashMap8<>();
  
      /** Discovery listener. */
@@@ -277,8 -277,8 +278,8 @@@
       * @param topVer Requested topology version.
       * @param fut Future to add.
       */
-     public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
-         GridDhtAssignmentFetchFuture<K, V> old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut);
 -    public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) {
++    public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
+         GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut);
  
          assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer;
      }
@@@ -287,7 -287,7 +288,7 @@@
       * @param topVer Requested topology version.
       * @param fut Future to remove.
       */
-     public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture<K, V> fut) {
 -    public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) {
++    public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
          boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut);
  
          assert rmv : "Failed to remove assignment fetch future: " + topVer;
@@@ -346,10 -346,10 +347,10 @@@
                  msg.futureId(),
                  msg.miniId());
  
-             for (K k : msg.keys()) {
+             for (KeyCacheObject k : msg.keys()) {
                  int p = cctx.affinity().partition(k);
  
-                 GridDhtLocalPartition<K, V> locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
 -                GridDhtLocalPartition locPart = top.localPartition(p, -1, false);
++                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
  
                  // If this node is no longer an owner.
                  if (locPart == null && !top.owners(p).contains(loc))
@@@ -423,14 -423,14 +424,14 @@@
       * @param req Request.
       */
      private void processAffinityAssignmentRequest(final ClusterNode node,
-         final GridDhtAffinityAssignmentRequest<K, V> req) {
+         final GridDhtAffinityAssignmentRequest req) {
 -        final long topVer = req.topologyVersion();
 +        final AffinityTopologyVersion topVer = req.topologyVersion();
  
          if (log.isDebugEnabled())
              log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']');
  
-         cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 -        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<Long>>() {
 -            @Override public void apply(IgniteInternalFuture<Long> fut) {
++        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                  if (log.isDebugEnabled())
                      log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                          ", node=" + node + ']');
@@@ -489,7 -489,7 +490,7 @@@
       * @return Future for request.
       */
      @SuppressWarnings( {"unchecked", "RedundantCast"})
-     @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) {
 -    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, long topVer) {
++    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
          final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
  
          IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 76ea0a3,dda2115..369fc68
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@@ -34,18 -33,18 +34,18 @@@ public class GridDhtPreloaderAssignment
  
      /** Exchange future. */
      @GridToStringExclude
-     private final GridDhtPartitionsExchangeFuture<K, V> exchFut;
+     private final GridDhtPartitionsExchangeFuture exchFut;
  
      /** Last join order. */
 -    private final long topVer;
 +    private final AffinityTopologyVersion topVer;
  
      /**
       * @param exchFut Exchange future.
       * @param topVer Last join order.
       */
-     public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, AffinityTopologyVersion topVer) {
 -    public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, long topVer) {
++    public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) {
          assert exchFut != null;
 -        assert topVer > 0;
 +        assert topVer.topologyVersion() > 0;
  
          this.exchFut = exchFut;
          this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 7bf4cc5,c889cf1..adc9d7b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@@ -209,9 -202,9 +203,9 @@@ public class GridNearAtomicCache<K, V> 
      ) throws IgniteCheckedException {
          try {
              while (true) {
-                 GridCacheEntryEx<K, V> entry = null;
+                 GridCacheEntryEx entry = null;
  
 -                long topVer = ctx.affinity().affinityTopologyVersion();
 +                AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
  
                  try {
                      entry = entryEx(key, topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 3ab2cf0,1687cb5..933fad4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@@ -67,13 -67,19 +67,21 @@@ public abstract class GridNearCacheAdap
  
      /** {@inheritDoc} */
      @Override protected void init() {
-         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+         map.setEntryFactory(new GridCacheMapEntryFactory() {
              /** {@inheritDoc} */
-             @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash,
-                 V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
 -            @Override public GridCacheMapEntry create(GridCacheContext ctx,
 -                long topVer, KeyCacheObject key,
++            @Override public GridCacheMapEntry create(
++                GridCacheContext ctx,
++                AffinityTopologyVersion topVer, 
++                KeyCacheObject key,
+                 int hash,
+                 CacheObject val,
+                 GridCacheMapEntry next,
+                 long ttl,
 -                int hdrId)
 -            {
++                int hdrId
++            ) {
                  // Can't hold any locks here - this method is invoked when
                  // holding write-lock on the whole cache map.
-                 return new GridNearCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId);
+                 return new GridNearCacheEntry(ctx, key, hash, val, next, ttl, hdrId);
              }
          });
      }
@@@ -113,8 -119,8 +121,8 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) {
-         GridNearCacheEntry<K, V> entry = null;
 -    @Override public GridCacheEntryEx entryEx(KeyCacheObject key, long topVer) {
++    @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
+         GridNearCacheEntry entry = null;
  
          while (true) {
              try {
@@@ -136,8 -142,8 +144,8 @@@
       * @param topVer Topology version.
       * @return Entry.
       */
-     public GridNearCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) {
-         return (GridNearCacheEntry<K, V>)entryEx(key, topVer);
 -    public GridNearCacheEntry entryExx(KeyCacheObject key, long topVer) {
++    public GridNearCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) {
+         return (GridNearCacheEntry)entryEx(key, topVer);
      }
  
      /**
@@@ -372,8 -334,8 +336,8 @@@
  
      /** {@inheritDoc} */
      @Override public Set<Cache.Entry<K, V>> primaryEntrySet(
-         @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) {
+         @Nullable final CacheEntryPredicate... filter) {
 -        final long topVer = ctx.affinity().affinityTopologyVersion();
 +        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
  
          Collection<Cache.Entry<K, V>> entries =
              F.flatCollections(


Mime
View raw message