ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [30/34] incubator-ignite git commit: # ignite-51
Date Wed, 25 Feb 2015 14:07:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 99f5128..08804ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -75,7 +75,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     private final GridCacheOperation op;
 
     /** Keys */
-    private Collection<Object> keys;
+    private Collection<?> keys;
 
     /** Values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -86,7 +86,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
 
     /** Conflict put values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Collection<GridCacheDrInfo<Object>> conflictPutVals;
+    private Collection<GridCacheDrInfo<?>> conflictPutVals;
 
     /** Conflict remove values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -184,10 +184,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         GridDhtAtomicCache cache,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        Collection<Object> keys,
+        Collection<?> keys,
         @Nullable Collection<?> vals,
         @Nullable Object[] invokeArgs,
-        @Nullable Collection<GridCacheDrInfo<Object>> conflictPutVals,
+        @Nullable Collection<GridCacheDrInfo<?>> conflictPutVals,
         @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final boolean rawRetval,
@@ -258,7 +258,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Object> keys() {
+    @Override public Collection<?> keys() {
         return keys;
     }
 
@@ -526,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
             }
             else if (conflictPutVals != null) {
                 // Conflict PUT.
-                GridCacheDrInfo<Object> conflictPutVal =  F.first(conflictPutVals);
+                GridCacheDrInfo<?> conflictPutVal =  F.first(conflictPutVals);
 
                 val = conflictPutVal.value();
                 conflictVer = conflictPutVal.version();
@@ -609,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         if (vals != null)
             it = vals.iterator();
 
-        Iterator<GridCacheDrInfo<Object>> conflictPutValsIt = null;
+        Iterator<GridCacheDrInfo<?>> conflictPutValsIt = null;
 
         if (conflictPutVals != null)
             conflictPutValsIt = conflictPutVals.iterator();
@@ -657,7 +657,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                     }
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo<Object> conflictPutVal =  conflictPutValsIt.next();
+                    GridCacheDrInfo<?> conflictPutVal =  conflictPutValsIt.next();
 
                     val = conflictPutVal.value();
                     conflictVer = conflictPutVal.version();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 0d3e996..f3560d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -378,6 +378,26 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         err.addSuppressed(e);
     }
 
+    /**
+     * Adds keys to collection of failed keys.
+     *
+     * @param keys Key to add.
+     * @param e Error cause.
+     * @param ctx Context.
+     */
+    public synchronized void addFailedKeys(Collection<Object> keys, Throwable e, GridCacheContext ctx) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>(keys.size());
+
+        for (Object key : keys)
+            failedKeys.add(ctx.toCacheKeyObject(key));
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 47d4ffe..3c09c56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -69,7 +69,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param ctx Cache context.
      * @param map Cache map.
      */
-    public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) {
+    public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
         super(ctx, map);
     }
 
@@ -80,11 +80,18 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
     /** {@inheritDoc} */
     @Override protected void init() {
-        map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+        map.setEntryFactory(new GridCacheMapEntryFactory() {
             /** {@inheritDoc} */
-            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
-                V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
-                return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
+            @Override public GridCacheMapEntry create(GridCacheContext ctx,
+                long topVer,
+                KeyCacheObject key,
+                int hash,
+                CacheObject val,
+                GridCacheMapEntry next,
+                long ttl,
+                int hdrId)
+            {
+                return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId);
             }
         });
     }
@@ -93,14 +100,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse<K, V>>() {
-            @Override public void apply(UUID nodeId, GridNearGetResponse<K, V> res) {
+        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
+            @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse<K, V>>() {
-            @Override public void apply(UUID nodeId, GridNearLockResponse<K, V> res) {
+        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
+            @Override public void apply(UUID nodeId, GridNearLockResponse res) {
                 processLockResponse(nodeId, res);
             }
         });
@@ -118,9 +125,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary
      *      for given key.
      */
-    public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, boolean allowDetached) {
+    public GridDistributedCacheEntry entryExx(KeyCacheObject key,
+        long topVer,
+        boolean allowDetached)
+    {
         return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ?
-            new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer);
+            new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer);
     }
 
     /** {@inheritDoc} */
@@ -144,12 +154,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
     /** {@inheritDoc} */
     @Override public boolean isLocked(K key) {
-        return ctx.mvcc().isLockedByThread(key, -1);
+        KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
+        return ctx.mvcc().isLockedByThread(cacheKey, -1);
     }
 
     /** {@inheritDoc} */
     @Override public boolean isLockedByThread(K key) {
-        return ctx.mvcc().isLockedByThread(key, Thread.currentThread().getId());
+        KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
+        return ctx.mvcc().isLockedByThread(cacheKey, Thread.currentThread().getId());
     }
 
     /** {@inheritDoc} */
@@ -169,12 +183,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
-        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) {
-                    return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
+                    return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
                 }
             });
         }
@@ -199,7 +213,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheEntryEx entryExSafe(K key, long topVer) {
+    @Override protected GridCacheEntryEx entryExSafe(KeyCacheObject key, long topVer) {
         try {
             return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null;
         }
@@ -253,15 +267,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 GridCacheEntryEx entry = null;
 
+                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
                 while (true) {
                     try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
 
                         // If our DHT cache do has value, then we peek it.
                         if (entry != null) {
                             boolean isNew = entry.isNewLocked();
 
-                            V v = entry.innerGet(null,
+                            CacheObject v = entry.innerGet(null,
                                 /*swap*/true,
                                 /*read-through*/false,
                                 /*fail-fast*/true,
@@ -279,15 +295,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 GridCacheVersion obsoleteVer = context().versions().next();
 
                                 if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeIfObsolete(key);
+                                    removeIfObsolete(cacheKey);
 
                                 success = false;
                             }
                             else {
+                                Object val = v.value(ctx);
+
                                 if (ctx.portableEnabled() && !skipVals)
-                                    v = (V)ctx.unwrapPortableIfNeeded(v, !deserializePortable);
+                                    val = ctx.unwrapPortableIfNeeded(val, !deserializePortable);
 
-                                locVals.put(key, (V)CU.skipValue(v, skipVals));
+                                locVals.put(key, (V)CU.skipValue(val, skipVals));
                             }
                         }
                         else
@@ -359,7 +377,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     @Override public IgniteInternalFuture<Boolean> lockAllAsync(
         Collection<? extends K> keys,
         long timeout,
-        @Nullable IgniteTxLocalEx<K, V> tx,
+        @Nullable IgniteTxLocalEx tx,
         boolean isInvalidate,
         boolean isRead,
         boolean retval,
@@ -369,7 +387,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert tx == null || tx instanceof GridNearTxLocal;
 
-        GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx;
+        GridNearTxLocal txx = (GridNearTxLocal)tx;
 
         GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
             keys,
@@ -409,19 +427,23 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
             int keyCnt = -1;
 
-            Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+            Map<ClusterNode, GridNearUnlockRequest> map = null;
 
-            Collection<K> locKeys = new ArrayList<>();
+            Collection<KeyCacheObject> locKeys = new ArrayList<>();
 
             for (K key : keys) {
-                GridDistributedCacheEntry<K, V> entry = peekExx(key);
+                // TODO IGNITE-51.
+                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
+                GridDistributedCacheEntry entry = peekExx(cacheKey);
 
-                Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.wrapLazyValue();
+                Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.<K, V>wrapLazyValue();
 
                 if (!ctx.isAll(Entry, filter))
                     break; // While.
 
-                GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null);
+                GridCacheMvccCandidate lock =
+                    ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), cacheKey, null);
 
                 if (lock != null) {
                     final long topVer = lock.topologyVersion();
@@ -448,20 +470,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 "then they need to be unlocked separately): " + keys);
 
                         if (!primary.isLocal()) {
-                            GridNearUnlockRequest<K, V> req = map.get(primary);
+                            GridNearUnlockRequest req = map.get(primary);
 
                             if (req == null) {
-                                map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt));
+                                map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt));
 
                                 req.version(ver);
                             }
 
                             byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key);
 
-                            req.addKey(key, keyBytes, ctx);
+                            req.addKey(cacheKey, keyBytes, ctx);
                         }
                         else
-                            locKeys.add(key);
+                            locKeys.add(cacheKey);
 
                         if (log.isDebugEnabled())
                             log.debug("Removed lock (will distribute): " + lock);
@@ -478,10 +500,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (!locKeys.isEmpty())
                 removeLocks(ctx.localNodeId(), ver, locKeys, true);
 
-            for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) {
+            for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();
 
-                GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+                GridDistributedUnlockRequest req = mapping.getValue();
 
                 assert !n.isLocal();
 
@@ -510,12 +532,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         try {
             int keyCnt = -1;
 
-            Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null;
+            Map<ClusterNode, GridNearUnlockRequest> map = null;
 
-            Collection<K> locKeys = new LinkedList<>();
+            Collection<KeyCacheObject> locKeys = new LinkedList<>();
 
             for (K key : keys) {
-                GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
+                // TODO IGNITE-51.
+                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
+                GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver);
 
                 if (lock != null) {
                     long topVer = lock.topologyVersion();
@@ -532,22 +557,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                     if (!primary.isLocal()) {
                         // Send request to remove from remote nodes.
-                        GridNearUnlockRequest<K, V> req = map.get(primary);
+                        GridNearUnlockRequest req = map.get(primary);
 
                         if (req == null) {
-                            map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt));
+                            map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt));
 
                             req.version(ver);
                         }
 
-                        GridCacheEntryEx entry = peekEx(key);
+                        GridCacheEntryEx entry = peekEx(cacheKey);
 
                         byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key);
 
-                        req.addKey(key, keyBytes, ctx);
+                        req.addKey(cacheKey, keyBytes, ctx);
                     }
                     else
-                        locKeys.add(key);
+                        locKeys.add(cacheKey);
                 }
             }
 
@@ -560,10 +585,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
             Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
 
-            for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) {
+            for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();
 
-                GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+                GridDistributedUnlockRequest req = mapping.getValue();
 
                 if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) {
                     req.completedVersions(committed, rolledback);
@@ -593,7 +618,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      */
     IgniteInternalFuture<Exception> lockAllAsync(
         final GridCacheContext<K, V> cacheCtx,
-        @Nullable final GridNearTxLocal<K, V> tx,
+        @Nullable final GridNearTxLocal tx,
         final long threadId,
         final GridCacheVersion ver,
         final long topVer,
@@ -605,7 +630,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+        // TODO IGNITE-51.
+        // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+        IgniteInternalFuture<Object> keyFut = null;
 
         // Prevent embedded future creation if possible.
         if (keyFut.isDone()) {
@@ -666,7 +693,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      */
     private IgniteInternalFuture<Exception> lockAllAsync0(
         GridCacheContext<K, V> cacheCtx,
-        @Nullable final GridNearTxLocal<K, V> tx,
+        @Nullable final GridNearTxLocal tx,
         long threadId,
         final GridCacheVersion ver,
         final long topVer,
@@ -700,8 +727,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 if (timedout)
                     break;
 
+                // TODO IGNITE-51.
+                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
                 while (true) {
-                    GridDhtCacheEntry<K, V> entry = entryExx(key, topVer);
+                    GridDhtCacheEntry entry = entryExx(cacheKey, topVer);
 
                     try {
                         fut.addEntry(key == null ? null : entry);
@@ -751,7 +781,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (log.isDebugEnabled())
                 log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']');
 
-            IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx,
+            IgniteInternalFuture<GridCacheReturn<Object>> txFut = tx.lockAllAsync(cacheCtx,
                 keys,
                 tx.implicit(),
                 txRead,
@@ -760,8 +790,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             return new GridDhtEmbeddedFuture<>(
                 ctx.kernalContext(),
                 txFut,
-                new C2<GridCacheReturn<V>, Exception, Exception>() {
-                    @Override public Exception apply(GridCacheReturn<V> ret,
+                new C2<GridCacheReturn<Object>, Exception, Exception>() {
+                    @Override public Exception apply(GridCacheReturn<Object> ret,
                         Exception e) {
                         if (e != null)
                             e = U.unwrap(e);
@@ -778,7 +808,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param nodeId Sender ID.
      * @param res Response.
      */
-    private void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) {
+    private void processGetResponse(UUID nodeId, GridNearGetResponse res) {
         GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
             res.version(), res.futureId());
 
@@ -796,7 +826,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processLockResponse(UUID nodeId, GridNearLockResponse<K, V> res) {
+    private void processLockResponse(UUID nodeId, GridNearLockResponse res) {
         assert nodeId != null;
         assert res != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 722b4e1..ee1e34a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -99,14 +99,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
     /** Transaction. */
     @GridToStringExclude
-    private GridNearTxLocal<K, V> tx;
+    private GridNearTxLocal tx;
 
     /** Topology snapshot to operate on. */
     private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
         new AtomicReference<>();
 
     /** Map of current values. */
-    private Map<K, GridTuple3<GridCacheVersion, V, byte[]>> valMap;
+    private Map<KeyCacheObject, GridTuple3<GridCacheVersion, CacheObject, byte[]>> valMap;
 
     /** Trackable flag (here may be non-volatile). */
     private boolean trackable;
@@ -134,7 +134,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     public GridDhtColocatedLockFuture(
         GridCacheContext<K, V> cctx,
         Collection<? extends K> keys,
-        @Nullable GridNearTxLocal<K, V> tx,
+        @Nullable GridNearTxLocal tx,
         boolean read,
         boolean retval,
         long timeout,
@@ -264,13 +264,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      *      implicit transaction accesses locked entry.
      * @throws IgniteCheckedException If failed to add entry due to external locking.
      */
-    @Nullable private GridCacheMvccCandidate addEntry(GridDistributedCacheEntry<K, V> entry) throws IgniteCheckedException {
+    @Nullable private GridCacheMvccCandidate addEntry(GridDistributedCacheEntry entry) throws IgniteCheckedException {
         GridCacheMvccCandidate cand = cctx.mvcc().explicitLock(threadId, entry.key());
 
         if (inTx()) {
             IgniteTxEntry txEntry = tx.entry(entry.txKey());
 
-            txEntry.cached(entry, txEntry.keyBytes());
+            txEntry.cached(entry, null);
 
             if (cand != null) {
                 if (!tx.implicit())
@@ -281,7 +281,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             }
             else {
                 // Check transaction entries (corresponding tx entries must be enlisted in transaction).
-                cand = new GridCacheMvccCandidate<>(entry,
+                cand = new GridCacheMvccCandidate(entry,
                     cctx.localNodeId(),
                     null,
                     null,
@@ -300,7 +300,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         }
         else {
             if (cand == null) {
-                cand = new GridCacheMvccCandidate<>(entry,
+                cand = new GridCacheMvccCandidate(entry,
                     cctx.localNodeId(),
                     null,
                     null,
@@ -399,7 +399,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param nodeId Sender.
      * @param res Result.
      */
-    void onResult(UUID nodeId, GridNearLockResponse<K, V> res) {
+    void onResult(UUID nodeId, GridNearLockResponse res) {
         if (!isDone()) {
             if (log.isDebugEnabled())
                 log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" +
@@ -535,7 +535,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             // Continue mapping on the same topology version as it was before.
             topSnapshot.compareAndSet(null, snapshot);
 
-            map(keys);
+            // TODO IGNITE-51.
+            // map(keys);
 
             markInitialized();
 
@@ -568,7 +569,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                     topSnapshot.compareAndSet(null, snapshot);
 
-                    map(keys);
+                    // TODO IGNITE-51.
+                    // map(keys);
 
                     markInitialized();
                 }
@@ -590,13 +592,85 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     }
 
     /**
+     * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
+     * remote primary node.
+     *
+     * @param mappings Queue of mappings.
+     * @throws IgniteCheckedException If mapping can not be completed.
+     */
+    private void proceedMapping(final Deque<GridNearLockMapping> mappings)
+        throws IgniteCheckedException {
+        GridNearLockMapping map = mappings.poll();
+
+        // If there are no more mappings to process, complete the future.
+        if (map == null)
+            return;
+
+        final GridNearLockRequest req = map.request();
+        final Collection<KeyCacheObject> mappedKeys = map.distributedKeys();
+        final ClusterNode node = map.node();
+
+        if (filter != null && filter.length != 0)
+            req.filter((IgnitePredicate[])filter, cctx);
+
+        if (node.isLocal())
+            lockLocally(mappedKeys, req.topologyVersion(), mappings);
+        else {
+            final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+
+            req.miniId(fut.futureId());
+
+            add(fut); // Append new future.
+
+            IgniteInternalFuture<?> txSync = null;
+
+            if (inTx())
+                txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId());
+
+            if (txSync == null || txSync.isDone()) {
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
+
+                    cctx.io().send(node, req, cctx.ioPolicy());
+                }
+                catch (ClusterTopologyCheckedException ex) {
+                    assert fut != null;
+
+                    fut.onResult(ex);
+                }
+            }
+            else {
+                txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> t) {
+                        try {
+                            if (log.isDebugEnabled())
+                                log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
+
+                            cctx.io().send(node, req, cctx.ioPolicy());
+                        }
+                        catch (ClusterTopologyCheckedException ex) {
+                            assert fut != null;
+
+                            fut.onResult(ex);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onError(e);
+                        }
+                    }
+                });
+            }
+        }
+    }
+
+    /**
      * Maps keys to nodes. Note that we can not simply group keys by nodes and send lock request as
      * such approach does not preserve order of lock acquisition. Instead, keys are split in continuous
      * groups belonging to one primary node and locks for these groups are acquired sequentially.
      *
      * @param keys Keys.
      */
-    private void map(Collection<? extends K> keys) {
+    private void map(Collection<KeyCacheObject> keys) {
         try {
             GridDiscoveryTopologySnapshot snapshot = topSnapshot.get();
 
@@ -616,13 +690,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             if (mapAsPrimary(keys, topVer))
                 return;
 
-            ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>();
+            ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
 
             // Assign keys to primary nodes.
-            GridNearLockMapping<K, V> map = null;
+            GridNearLockMapping map = null;
 
-            for (K key : keys) {
-                GridNearLockMapping<K, V> updated = map(key, map, topVer);
+            for (KeyCacheObject key : keys) {
+                GridNearLockMapping updated = map(key, map, topVer);
 
                 // If new mapping was created, add to collection.
                 if (updated != map) {
@@ -648,27 +722,27 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             boolean hasRmtNodes = false;
 
             // Create mini futures.
-            for (Iterator<GridNearLockMapping<K, V>> iter = mappings.iterator(); iter.hasNext(); ) {
-                GridNearLockMapping<K, V> mapping = iter.next();
+            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+                GridNearLockMapping mapping = iter.next();
 
                 ClusterNode node = mapping.node();
-                Collection<K> mappedKeys = mapping.mappedKeys();
+                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 
                 boolean loc = node.equals(cctx.localNode());
 
                 assert !mappedKeys.isEmpty();
 
-                GridNearLockRequest<K, V> req = null;
+                GridNearLockRequest req = null;
 
-                Collection<K> distributedKeys = new ArrayList<>(mappedKeys.size());
+                Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 
-                for (K key : mappedKeys) {
+                for (KeyCacheObject key : mappedKeys) {
                     boolean explicit;
 
-                    IgniteTxKey<K> txKey = cctx.txKey(key);
+                    IgniteTxKey txKey = cctx.txKey(key);
 
                     while (true) {
-                        GridDistributedCacheEntry<K, V> entry = null;
+                        GridDistributedCacheEntry entry = null;
 
                         try {
                             entry = cctx.colocated().entryExx(key, topVer, true);
@@ -687,8 +761,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                             GridCacheMvccCandidate cand = addEntry(entry);
 
                             // Will either return value from dht cache or null if this is a miss.
-                            GridTuple3<GridCacheVersion, V, byte[]> val = entry.detached() ? null :
-                                ((GridDhtCacheEntry<K, V>)entry).versionedValue(topVer);
+                            GridTuple3<GridCacheVersion, CacheObject, byte[]> val = entry.detached() ? null :
+                                ((GridDhtCacheEntry)entry).versionedValue(topVer);
 
                             GridCacheVersion dhtVer = null;
 
@@ -700,7 +774,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                             if (cand != null && !cand.reentry()) {
                                 if (req == null) {
-                                    req = new GridNearLockRequest<>(
+                                    req = new GridNearLockRequest(
                                         cctx.cacheId(),
                                         topVer,
                                         cctx.nodeId(),
@@ -792,86 +866,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     }
 
     /**
-     * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
-     * remote primary node.
-     *
-     * @param mappings Queue of mappings.
-     * @throws IgniteCheckedException If mapping can not be completed.
-     */
-    private void proceedMapping(final Deque<GridNearLockMapping<K, V>> mappings)
-        throws IgniteCheckedException {
-        GridNearLockMapping<K, V> map = mappings.poll();
-
-        // If there are no more mappings to process, complete the future.
-        if (map == null)
-            return;
-
-        final GridNearLockRequest<K, V> req = map.request();
-        final Collection<K> mappedKeys = map.distributedKeys();
-        final ClusterNode node = map.node();
-
-        if (filter != null && filter.length != 0)
-            req.filter(filter, cctx);
-
-        if (node.isLocal())
-            lockLocally(mappedKeys, req.topologyVersion(), mappings);
-        else {
-            final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
-
-            req.miniId(fut.futureId());
-
-            add(fut); // Append new future.
-
-            IgniteInternalFuture<?> txSync = null;
-
-            if (inTx())
-                txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId());
-
-            if (txSync == null || txSync.isDone()) {
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
-
-                    cctx.io().send(node, req, cctx.ioPolicy());
-                }
-                catch (ClusterTopologyCheckedException ex) {
-                    assert fut != null;
-
-                    fut.onResult(ex);
-                }
-            }
-            else {
-                txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> t) {
-                        try {
-                            if (log.isDebugEnabled())
-                                log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
-
-                            cctx.io().send(node, req, cctx.ioPolicy());
-                        }
-                        catch (ClusterTopologyCheckedException ex) {
-                            assert fut != null;
-
-                            fut.onResult(ex);
-                        }
-                        catch (IgniteCheckedException e) {
-                            onError(e);
-                        }
-                    }
-                });
-            }
-        }
-    }
-
-    /**
      * Locks given keys directly through dht cache.
      *
      * @param keys Collection of keys.
      * @param topVer Topology version to lock on.
      * @param mappings Optional collection of mappings to proceed locking.
      */
-    private void lockLocally(final Collection<K> keys, long topVer,
-        @Nullable final Deque<GridNearLockMapping<K, V>> mappings) {
+    private void lockLocally(final Collection<KeyCacheObject> keys, long topVer,
+        @Nullable final Deque<GridNearLockMapping> mappings) {
         if (log.isDebugEnabled())
             log.debug("Before locally locking keys : " + keys);
 
@@ -880,7 +882,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             threadId,
             lockVer,
             topVer,
-            keys,
+            // TODO IGNITE-51.
+            // keys,
+            null,
             read,
             timeout,
             accessTtl,
@@ -913,11 +917,11 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                             ", mappedKeys=" + keys + ", fut=" + GridDhtColocatedLockFuture.this + ']');
 
                     if (inTx()) {
-                        for (K key : keys)
+                        for (KeyCacheObject key : keys)
                             tx.entry(cctx.txKey(key)).markLocked();
                     }
                     else {
-                        for (K key : keys)
+                        for (KeyCacheObject key : keys)
                             cctx.mvcc().markExplicitOwner(key, threadId);
                     }
 
@@ -947,14 +951,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed.
      * @throws IgniteCheckedException If key cannot be added to mapping.
      */
-    private boolean mapAsPrimary(Collection<? extends K> keys, long topVer) throws IgniteCheckedException {
+    private boolean mapAsPrimary(Collection<KeyCacheObject> keys, long topVer) throws IgniteCheckedException {
         // Assign keys to primary nodes.
-        Collection<K> distributedKeys = new ArrayList<>(keys.size());
+        Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size());
 
-        for (K key : keys) {
+        for (KeyCacheObject key : keys) {
             if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                 // Remove explicit locks added so far.
-                for (K k : keys)
+                for (KeyCacheObject k : keys)
                     cctx.mvcc().removeExplicitLock(threadId, k, lockVer);
 
                 return false;
@@ -973,7 +977,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
         if (!distributedKeys.isEmpty()) {
             if (tx != null) {
-                for (K key : distributedKeys)
+                for (KeyCacheObject key : distributedKeys)
                     tx.addKeyMapping(cctx.txKey(key), cctx.localNode());
             }
 
@@ -992,8 +996,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return {@code True} if transaction accesses key that was explicitly locked before.
      * @throws IgniteCheckedException If lock is externally held and transaction is explicit.
      */
-    private boolean addLocalKey(K key, long topVer, Collection<K> distributedKeys) throws IgniteCheckedException {
-        GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false);
+    private boolean addLocalKey(KeyCacheObject key, long topVer, Collection<KeyCacheObject> distributedKeys)
+        throws IgniteCheckedException {
+        GridDistributedCacheEntry entry = cctx.colocated().entryExx(key, topVer, false);
 
         assert !entry.detached();
 
@@ -1021,7 +1026,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return Near lock mapping.
      * @throws IgniteCheckedException If mapping failed.
      */
-    private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping,
+    private GridNearLockMapping map(KeyCacheObject key, @Nullable GridNearLockMapping mapping,
         long topVer) throws IgniteCheckedException {
         assert mapping == null || mapping.node() != null;
 
@@ -1036,7 +1041,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
 
         if (mapping == null || !primary.id().equals(mapping.node().id()))
-            mapping = new GridNearLockMapping<>(primary, key);
+            mapping = new GridNearLockMapping(primary, key);
         else
             mapping.addKey(key);
 
@@ -1097,11 +1102,11 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
         /** Keys. */
         @GridToStringInclude
-        private Collection<K> keys;
+        private Collection<KeyCacheObject> keys;
 
         /** Mappings to proceed. */
         @GridToStringExclude
-        private Deque<GridNearLockMapping<K, V>> mappings;
+        private Deque<GridNearLockMapping> mappings;
 
         /** */
         private AtomicBoolean rcvRes = new AtomicBoolean(false);
@@ -1118,8 +1123,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
          * @param keys Keys.
          * @param mappings Mappings to proceed.
          */
-        MiniFuture(ClusterNode node, Collection<K> keys,
-            Deque<GridNearLockMapping<K, V>> mappings) {
+        MiniFuture(ClusterNode node,
+            Collection<KeyCacheObject> keys,
+            Deque<GridNearLockMapping> mappings) {
             super(cctx.kernalContext());
 
             this.node = node;
@@ -1144,7 +1150,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         /**
          * @return Keys.
          */
-        public Collection<K> keys() {
+        public Collection<KeyCacheObject> keys() {
             return keys;
         }
 
@@ -1188,7 +1194,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         /**
          * @param res Result callback.
          */
-        void onResult(GridNearLockResponse<K, V> res) {
+        void onResult(GridNearLockResponse res) {
             if (rcvRes.compareAndSet(false, true)) {
                 if (res.error() != null) {
                     if (log.isDebugEnabled())
@@ -1206,10 +1212,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                 int i = 0;
 
-                for (K k : keys) {
-                    GridTuple3<GridCacheVersion, V, byte[]> oldValTup = valMap.get(k);
+                for (KeyCacheObject k : keys) {
+                    GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = valMap.get(k);
 
-                    V newVal = res.value(i);
+                    CacheObject newVal = res.value(i);
                     byte[] newBytes = res.valueBytes(i);
 
                     GridCacheVersion dhtVer = res.dhtVersion(i);
@@ -1232,7 +1238,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                         txEntry.markLocked();
 
-                        GridDhtDetachedCacheEntry<K, V> entry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached();
+                        GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
                         try {
                             if (res.dhtVersion(i) == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index b1c135a..66efe48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -65,7 +65,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     private IgniteLogger log;
 
     /** Keys to request. */
-    private Collection<? extends K> keys;
+    private Collection<KeyCacheObject> keys;
 
     /** Keys for which local node is no longer primary. */
     private Collection<Integer> invalidParts = new GridLeanSet<>();
@@ -91,7 +91,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param keys Keys.
      * @param preloader Preloader.
      */
-    public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<? extends K> keys,
+    public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<KeyCacheObject> keys,
         GridDhtPreloader<K, V> preloader) {
         super(cctx.kernalContext());
 
@@ -213,14 +213,14 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param exc Exclude nodes.
      * @return {@code True} if some mapping was added.
      */
-    private boolean map(Iterable<? extends K> keys, Collection<ClusterNode> exc) {
-        Map<ClusterNode, Set<K>> mappings = new HashMap<>();
+    private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
+        Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
 
         ClusterNode loc = cctx.localNode();
 
         int curTopVer = topCntr.get();
 
-        for (K key : keys)
+        for (KeyCacheObject key : keys)
             map(key, mappings, exc);
 
         if (isDone())
@@ -234,9 +234,9 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
             trackable = true;
 
             // Create mini futures.
-            for (Map.Entry<ClusterNode, Set<K>> mapped : mappings.entrySet()) {
+            for (Map.Entry<ClusterNode, Set<KeyCacheObject>> mapped : mappings.entrySet()) {
                 ClusterNode n = mapped.getKey();
-                Set<K> mappedKeys = mapped.getValue();
+                Set<KeyCacheObject> mappedKeys = mapped.getValue();
 
                 int cnt = F.size(mappedKeys);
 
@@ -245,7 +245,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
                     MiniFuture fut = new MiniFuture(n, mappedKeys, curTopVer, exc);
 
-                    GridDhtForceKeysRequest req = new GridDhtForceKeysRequest<>(
+                    GridDhtForceKeysRequest req = new GridDhtForceKeysRequest(
                         cctx.cacheId(),
                         futId,
                         fut.miniId(),
@@ -282,7 +282,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param exc Exclude nodes.
      * @param mappings Mappings.
      */
-    private void map(KeyCacheObject key, Map<ClusterNode, Set<K>> mappings, Collection<ClusterNode> exc) {
+    private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
         ClusterNode loc = cctx.localNode();
 
         int part = cctx.affinity().partition(key);
@@ -318,7 +318,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
         }
 
         // Create partition.
-        GridDhtLocalPartition<K, V> locPart = top.localPartition(part, topVer, false);
+        GridDhtLocalPartition locPart = top.localPartition(part, topVer, false);
 
         if (log.isDebugEnabled())
             log.debug("Mapping local partition [loc=" + cctx.localNodeId() + ", topVer" + topVer +
@@ -346,7 +346,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                 return;
             }
 
-            Collection<K> mappedKeys = F.addIfAbsent(mappings, pick, F.<K>newSet());
+            Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
 
             assert mappedKeys != null;
 
@@ -380,7 +380,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
         private ClusterNode node;
 
         /** Requested keys. */
-        private Collection<K> keys;
+        private Collection<KeyCacheObject> keys;
 
         /** Topology version for this mini-future. */
         private int curTopVer;
@@ -404,7 +404,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
          * @param curTopVer Topology version for this mini-future.
          * @param exc Exclude node list.
          */
-        MiniFuture(ClusterNode node, Collection<K> keys, int curTopVer, Collection<ClusterNode> exc) {
+        MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, int curTopVer, Collection<ClusterNode> exc) {
             super(cctx.kernalContext());
 
             assert node != null;
@@ -466,7 +466,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
          * @param res Result callback.
          */
         void onResult(GridDhtForceKeysResponse res) {
-            Collection missedKeys = res.missedKeys();
+            Collection<KeyCacheObject> missedKeys = res.missedKeys();
 
             boolean remapMissed = false;
 
@@ -479,10 +479,10 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
             // If preloading is disabled, we need to check other backups.
             if (!cctx.preloadEnabled()) {
-                Collection<K> retryKeys = F.view(
+                Collection<KeyCacheObject> retryKeys = F.view(
                     keys,
                     F0.notIn(missedKeys),
-                    F0.notIn(F.viewReadOnly(res.forcedInfos(), CU.<K, V>info2Key())));
+                    F0.notIn(F.viewReadOnly(res.forcedInfos(), CU.<KeyCacheObject, V>info2Key())));
 
                 if (!retryKeys.isEmpty())
                     map(retryKeys, F.concat(false, node, exc));
@@ -495,7 +495,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
             for (GridCacheEntryInfo info : res.forcedInfos()) {
                 int p = cctx.affinity().partition(info.key());
 
-                GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false);
+                GridDhtLocalPartition locPart = top.localPartition(p, -1, false);
 
                 if (locPart != null && locPart.state() == MOVING && locPart.reserve()) {
                     GridCacheEntryEx entry = cctx.dht().entryEx(info.key());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 9448fa9..e7b8f9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -54,7 +54,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemandPool<K, V> {
     /** Dummy message to wake up a blocking queue if a node leaves. */
-    private final SupplyMessage<K, V> DUMMY_TOP = new SupplyMessage<>();
+    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
 
     /** */
     private final GridCacheContext<K, V> cctx;
@@ -70,7 +70,7 @@ public class GridDhtPartitionDemandPool<K, V> {
     private final Collection<DemandWorker> dmdWorkers;
 
     /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred;
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /** Future for preload mode {@link org.apache.ignite.cache.CachePreloadMode#SYNC}. */
     @GridToStringInclude
@@ -172,7 +172,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      *
      * @param preloadPred Preload predicate.
      */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) {
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         this.preloadPred = preloadPred;
     }
 
@@ -261,7 +261,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param msg Message to check.
      * @return {@code True} if dummy message.
      */
-    private boolean dummyTopology(SupplyMessage<K, V> msg) {
+    private boolean dummyTopology(SupplyMessage msg) {
         return msg == DUMMY_TOP;
     }
 
@@ -401,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
 
         /** Message queue. */
-        private final LinkedBlockingDeque<SupplyMessage<K, V>> msgQ =
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
             new LinkedBlockingDeque<>();
 
         /** Counter. */
@@ -443,7 +443,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         /**
          * @param msg Message.
          */
-        private void addMessage(SupplyMessage<K, V> msg) {
+        private void addMessage(SupplyMessage msg) {
             if (!enterBusy())
                 return;
 
@@ -481,7 +481,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @return {@code False} if partition has become invalid during preloading.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer)
+        private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo entry, long topVer)
             throws IgniteCheckedException {
             try {
                 GridCacheEntryEx cached = null;
@@ -506,7 +506,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     if (preloadPred == null || preloadPred.apply(entry)) {
                         if (cached.initialValue(
                             entry.value(),
-                            entry.valueBytes(),
+                            null,
                             entry.version(),
                             entry.ttl(),
                             entry.expireTime(),
@@ -569,7 +569,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @throws ClusterTopologyCheckedException If node left.
          * @throws IgniteCheckedException If failed to send message.
          */
-        private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage<K, V> d,
+        private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage d,
             GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException {
             GridDhtPartitionTopology<K, V> top = cctx.dht().topology();
 
@@ -589,9 +589,9 @@ public class GridDhtPartitionDemandPool<K, V> {
             if (isCancelled() || topologyChanged())
                 return missed;
 
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() {
-                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage<K, V> msg) {
-                    addMessage(new SupplyMessage<>(nodeId, msg));
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
                 }
             });
 
@@ -604,7 +604,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     retry = false;
 
                     // Create copy.
-                    d = new GridDhtPartitionDemandMessage<>(d, remaining);
+                    d = new GridDhtPartitionDemandMessage(d, remaining);
 
                     long timeout = GridDhtPartitionDemandPool.this.timeout.get();
 
@@ -619,7 +619,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     // While.
                     // =====
                     while (!isCancelled() && !topologyChanged()) {
-                        SupplyMessage<K, V> s = poll(msgQ, timeout, this);
+                        SupplyMessage s = poll(msgQ, timeout, this);
 
                         // If timed out.
                         if (s == null) {
@@ -634,17 +634,17 @@ public class GridDhtPartitionDemandPool<K, V> {
                                 cctx.io().removeOrderedHandler(d.topic());
 
                                 // Must create copy to be able to work with IO manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage<>(d, remaining);
+                                d = new GridDhtPartitionDemandMessage(d, remaining);
 
                                 // Create new topic.
                                 d.topic(topic(++cntr));
 
                                 // Create new ordered listener.
                                 cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() {
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
                                         @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage<K, V> msg) {
-                                            addMessage(new SupplyMessage<>(nodeId, msg));
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
                                         }
                                     });
 
@@ -676,7 +676,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                         if (log.isDebugEnabled())
                             log.debug("Received supply message: " + s);
 
-                        GridDhtPartitionSupplyMessage<K, V> supply = s.supply();
+                        GridDhtPartitionSupplyMessage supply = s.supply();
 
                         // Check whether there were class loading errors on unmarshal
                         if (supply.classError() != null) {
@@ -690,11 +690,11 @@ public class GridDhtPartitionDemandPool<K, V> {
                         }
 
                         // Preload.
-                        for (Map.Entry<Integer, Collection<GridCacheEntryInfo<K, V>>> e : supply.infos().entrySet()) {
+                        for (Map.Entry<Integer, Collection<GridCacheEntryInfo>> e : supply.infos().entrySet()) {
                             int p = e.getKey();
 
                             if (cctx.affinity().localNode(p, topVer)) {
-                                GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true);
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                                 assert part != null;
 
@@ -710,7 +710,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                                         Collection<Integer> invalidParts = new GridLeanSet<>();
 
                                         // Loop through all received entries and try to preload them.
-                                        for (GridCacheEntryInfo<K, V> entry : e.getValue()) {
+                                        for (GridCacheEntryInfo entry : e.getValue()) {
                                             if (!invalidParts.contains(p)) {
                                                 if (!part.preloadingPermitted(entry.key(), entry.version())) {
                                                     if (log.isDebugEnabled())
@@ -799,7 +799,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          */
         private void drainQueue() throws InterruptedException {
             while (!msgQ.isEmpty()) {
-                SupplyMessage<K, V> msg = msgQ.take();
+                SupplyMessage msg = msgQ.take();
 
                 if (log.isDebugEnabled())
                     log.debug("Drained supply message: " + msg);
@@ -885,7 +885,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                                 if (topologyChanged() || isCancelled())
                                     break; // For.
 
-                                GridDhtPartitionDemandMessage<K, V> d = assigns.remove(node);
+                                GridDhtPartitionDemandMessage d = assigns.remove(node);
 
                                 // If another thread is already processing this message,
                                 // move to the next node.
@@ -997,7 +997,7 @@ public class GridDhtPartitionDemandPool<K, V> {
 
             // If partition belongs to local node.
             if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true);
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                 assert part != null;
                 assert part.id() == p;
@@ -1020,10 +1020,10 @@ public class GridDhtPartitionDemandPool<K, V> {
                 else {
                     ClusterNode n = F.first(picked);
 
-                    GridDhtPartitionDemandMessage<K, V> msg = assigns.get(n);
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
 
                     if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage<>(
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
                             exchFut.exchangeId().topologyVersion(),
                             cctx.cacheId()));
@@ -1088,12 +1088,12 @@ public class GridDhtPartitionDemandPool<K, V> {
     /**
      * Supply message wrapper.
      */
-    private static class SupplyMessage<K, V> {
+    private static class SupplyMessage {
         /** Sender ID. */
         private UUID sndId;
 
         /** Supply message. */
-        private GridDhtPartitionSupplyMessage<K, V> supply;
+        private GridDhtPartitionSupplyMessage supply;
 
         /**
          * Dummy constructor.
@@ -1106,7 +1106,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @param sndId Sender ID.
          * @param supply Supply message.
          */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage<K, V> supply) {
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
             this.sndId = sndId;
             this.supply = supply;
         }
@@ -1121,7 +1121,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         /**
          * @return Message.
          */
-        GridDhtPartitionSupplyMessage<K, V> supply() {
+        GridDhtPartitionSupplyMessage supply() {
             return supply;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index c141594..6a3e48d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -60,13 +60,13 @@ class GridDhtPartitionSupplyPool<K, V> {
     private final Collection<SupplyWorker> workers = new LinkedList<>();
 
     /** */
-    private final BlockingQueue<DemandMessage<K, V>> queue = new LinkedBlockingDeque<>();
+    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
 
     /** */
     private final boolean depEnabled;
 
     /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred;
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /**
      * @param cctx Cache context.
@@ -88,8 +88,8 @@ class GridDhtPartitionSupplyPool<K, V> {
         for (int i = 0; i < poolSize; i++)
             workers.add(new SupplyWorker());
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage<K, V>>() {
-            @Override public void apply(UUID id, GridDhtPartitionDemandMessage<K, V> m) {
+        cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+            @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
                 processDemandMessage(id, m);
             }
         });
@@ -120,7 +120,7 @@ class GridDhtPartitionSupplyPool<K, V> {
      *
      * @param preloadPred Preload predicate.
      */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) {
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         this.preloadPred = preloadPred;
     }
 
@@ -148,7 +148,7 @@ class GridDhtPartitionSupplyPool<K, V> {
      * @param nodeId Sender node ID.
      * @param d Message.
      */
-    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage<K, V> d) {
+    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
         if (!enterBusy())
             return;
 
@@ -157,7 +157,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                 if (log.isDebugEnabled())
                     log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
 
-                queue.offer(new DemandMessage<>(nodeId, d));
+                queue.offer(new DemandMessage(nodeId, d));
             }
             else
                 U.warn(log, "Received partition demand message when preloading is disabled (will ignore): " + d);
@@ -212,7 +212,7 @@ class GridDhtPartitionSupplyPool<K, V> {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             while (!isCancelled()) {
-                DemandMessage<K, V> msg = poll(queue, this);
+                DemandMessage msg = poll(queue, this);
 
                 if (msg == null)
                     continue;
@@ -234,13 +234,13 @@ class GridDhtPartitionSupplyPool<K, V> {
          * @param msg Message.
          * @param node Demander.
          */
-        private void processMessage(DemandMessage<K, V> msg, ClusterNode node) {
+        private void processMessage(DemandMessage msg, ClusterNode node) {
             assert msg != null;
             assert node != null;
 
-            GridDhtPartitionDemandMessage<K, V> d = msg.message();
+            GridDhtPartitionDemandMessage d = msg.message();
 
-            GridDhtPartitionSupplyMessage<K, V> s = new GridDhtPartitionSupplyMessage<>(d.workerId(),
+            GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
                 d.updateSequence(), cctx.cacheId());
 
             long preloadThrottle = cctx.config().getPreloadThrottle();
@@ -256,7 +256,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                 cctx.mvcc().finishLocks(d.topologyVersion()).get();
 
                 for (Integer part : d.partitions()) {
-                    GridDhtLocalPartition<K, V> loc = top.localPartition(part, d.topologyVersion(), false);
+                    GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 
                     if (loc == null || loc.state() != OWNING || !loc.reserve()) {
                         // Reply with partition of "-1" to let sender know that
@@ -270,11 +270,11 @@ class GridDhtPartitionSupplyPool<K, V> {
                         continue;
                     }
 
-                    GridCacheEntryInfoCollectSwapListener<K, V> swapLsnr = null;
+                    GridCacheEntryInfoCollectSwapListener swapLsnr = null;
 
                     try {
                         if (cctx.isSwapOrOffheapEnabled()) {
-                            swapLsnr = new GridCacheEntryInfoCollectSwapListener<>(log, cctx);
+                            swapLsnr = new GridCacheEntryInfoCollectSwapListener(log, cctx);
 
                             cctx.swap().addOffHeapListener(part, swapLsnr);
                             cctx.swap().addSwapListener(part, swapLsnr);
@@ -306,11 +306,11 @@ class GridDhtPartitionSupplyPool<K, V> {
                                 if (preloadThrottle > 0)
                                     U.sleep(preloadThrottle);
 
-                                s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(),
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                     cctx.cacheId());
                             }
 
-                            GridCacheEntryInfo<K, V> info = e.info();
+                            GridCacheEntryInfo info = e.info();
 
                             if (info != null && !(info.key() instanceof GridPartitionLockKey) && !info.isNew()) {
                                 if (preloadPred == null || preloadPred.apply(info))
@@ -325,7 +325,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                             continue;
 
                         if (cctx.isSwapOrOffheapEnabled()) {
-                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> iter =
+                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
                                 cctx.swap().iterator(part, false);
 
                             // Iterator may be null if space does not exist.
@@ -333,7 +333,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                                 try {
                                     boolean prepared = false;
 
-                                    for (Map.Entry<byte[], GridCacheSwapEntry<V>> e : iter) {
+                                    for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
                                         if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
                                             // Demander no longer needs this partition,
                                             // so we send '-1' partition and move on.
@@ -358,27 +358,27 @@ class GridDhtPartitionSupplyPool<K, V> {
                                             if (preloadThrottle > 0)
                                                 U.sleep(preloadThrottle);
 
-                                            s = new GridDhtPartitionSupplyMessage<>(d.workerId(),
+                                            s = new GridDhtPartitionSupplyMessage(d.workerId(),
                                                 d.updateSequence(), cctx.cacheId());
                                         }
 
-                                        GridCacheSwapEntry<V> swapEntry = e.getValue();
+                                        GridCacheSwapEntry swapEntry = e.getValue();
 
-                                        GridCacheEntryInfo<K, V> info = new GridCacheEntryInfo<>();
+                                        GridCacheEntryInfo info = new GridCacheEntryInfo();
 
-                                        info.keyBytes(e.getKey());
                                         info.ttl(swapEntry.ttl());
                                         info.expireTime(swapEntry.expireTime());
                                         info.version(swapEntry.version());
-
-                                        if (!swapEntry.valueIsByteArray()) {
-                                            if (convertPortable)
-                                                info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes()));
-                                            else
-                                                info.valueBytes(swapEntry.valueBytes());
-                                        }
-                                        else
-                                            info.value(swapEntry.value());
+// TODO IGNITE-51.
+//                                        if (!swapEntry.valueIsByteArray()) {
+//                                            if (convertPortable)
+//                                                info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes()));
+//                                            else
+//                                                info.valueBytes(swapEntry.valueBytes());
+//                                        }
+//                                        else
+//                                            info.value(swapEntry.value());
+                                        info.value(swapEntry.value());
 
                                         if (preloadPred == null || preloadPred.apply(info))
                                             s.addEntry0(part, info, cctx.shared());
@@ -425,11 +425,11 @@ class GridDhtPartitionSupplyPool<K, V> {
                         }
 
                         if (swapLsnr != null) {
-                            Collection<GridCacheEntryInfo<K, V>> entries = swapLsnr.entries();
+                            Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
 
                             swapLsnr = null;
 
-                            for (GridCacheEntryInfo<K, V> info : entries) {
+                            for (GridCacheEntryInfo info : entries) {
                                 if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
                                     // Demander no longer needs this partition,
                                     // so we send '-1' partition and move on.
@@ -449,7 +449,8 @@ class GridDhtPartitionSupplyPool<K, V> {
                                     if (!reply(node, d, s))
                                         return;
 
-                                    s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(),
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                        d.updateSequence(),
                                         cctx.cacheId());
                                 }
 
@@ -494,7 +495,7 @@ class GridDhtPartitionSupplyPool<K, V> {
          * @return {@code True} if message was sent, {@code false} if recipient left grid.
          * @throws IgniteCheckedException If failed.
          */
-        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage<K, V> d, GridDhtPartitionSupplyMessage<K, V> s)
+        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
             throws IgniteCheckedException {
             try {
                 if (log.isDebugEnabled())
@@ -516,7 +517,7 @@ class GridDhtPartitionSupplyPool<K, V> {
     /**
      * Demand message wrapper.
      */
-    private static class DemandMessage<K, V> extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage<K, V>> {
+    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -524,7 +525,7 @@ class GridDhtPartitionSupplyPool<K, V> {
          * @param sndId Sender ID.
          * @param msg Message.
          */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage<K, V> msg) {
+        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
             super(sndId, msg);
         }
 
@@ -545,7 +546,7 @@ class GridDhtPartitionSupplyPool<K, V> {
         /**
          * @return Message.
          */
-        public GridDhtPartitionDemandMessage<K, V> message() {
+        public GridDhtPartitionDemandMessage message() {
             return get2();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db9bd08..1918f61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -125,10 +125,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
      */
-    private final Map<UUID, GridDhtPartitionsSingleMessage<K, V>> singleMsgs = new ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
 
     /** Messages received from new coordinator. */
-    private final Map<UUID, GridDhtPartitionsFullMessage<K, V>> fullMsgs = new ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
 
     /** */
     @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@@ -436,11 +436,11 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
 
-                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage<K, V>> m : singleMsgs.entrySet())
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
                     // If received any messages, process them.
                     onReceive(m.getKey(), m.getValue());
 
-                for (Map.Entry<UUID, GridDhtPartitionsFullMessage<K, V>> m : fullMsgs.entrySet())
+                for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
                     // If received any messages, process them.
                     onReceive(m.getKey(), m.getValue());
 
@@ -551,7 +551,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage<K, V> m = new GridDhtPartitionsSingleMessage<>(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
 
         for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())
@@ -571,7 +571,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      */
     private void sendAllPartitions(Collection<? extends ClusterNode> nodes, GridDhtPartitionExchangeId id)
         throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(id, lastVer.get(),
+        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(id,
+            lastVer.get(),
             id.topologyVersion());
 
         for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
@@ -695,7 +696,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      * @param nodeId Sender node id.
      * @param msg Single partition info.
      */
-    public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage<K, V> msg) {
+    public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) {
         assert msg != null;
 
         assert msg.exchangeId().equals(exchId);
@@ -788,7 +789,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      * @param nodeId Sender node ID.
      * @param msg Full partition info.
      */
-    public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage<K, V> msg) {
+    public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage msg) {
         assert msg != null;
 
         if (isDone()) {
@@ -847,7 +848,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      *
      * @param msg Partitions full messages.
      */
-    private void updatePartitionFullMap(GridDhtPartitionsFullMessage<K, V> msg) {
+    private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
 
@@ -865,7 +866,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
      *
      * @param msg Partitions single message.
      */
-    private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage<K, V> msg) {
+    private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) {
         for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
             GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
@@ -952,10 +953,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
 
                             if (set) {
                                 // If received any messages, process them.
-                                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage<K, V>> m : singleMsgs.entrySet())
+                                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
                                     onReceive(m.getKey(), m.getValue());
 
-                                for (Map.Entry<UUID, GridDhtPartitionsFullMessage<K, V>> m : fullMsgs.entrySet())
+                                for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
                                     onReceive(m.getKey(), m.getValue());
 
                                 // Reassign oldest node and resend.
@@ -1000,7 +1001,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
             if (!remaining.isEmpty()) {
                 try {
                     cctx.io().safeSend(cctx.discovery().nodes(remaining),
-                        new GridDhtPartitionsSingleRequest<K, V>(exchId), SYSTEM_POOL, null);
+                        new GridDhtPartitionsSingleRequest(exchId), SYSTEM_POOL, null);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId +


Mime
View raw message