ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [31/34] incubator-ignite git commit: # ignite-51
Date Wed, 25 Feb 2015 14:07:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8b10a92..45b13b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -50,11 +50,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping<K, V>> nearMap =
+    protected Map<UUID, GridDistributedTxMapping> nearMap =
         new ConcurrentHashMap8<>();
 
     /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping<K, V>> dhtMap =
+    protected Map<UUID, GridDistributedTxMapping> dhtMap =
         new ConcurrentHashMap8<>();
 
     /** Mapped flag. */
@@ -90,7 +90,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param partLock If this is a group-lock transaction and the whole partition should be locked.
      */
     protected GridDhtTxLocalAdapter(
-        GridCacheSharedContext<K, V> cctx,
+        GridCacheSharedContext cctx,
         GridCacheVersion xidVer,
         boolean implicit,
         boolean implicitSingle,
@@ -140,7 +140,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @return {@code True} if reader was added as a result of this call.
      */
     @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId,
-        GridDhtCacheEntry<K, V> cached,
+        GridDhtCacheEntry cached,
         IgniteTxEntry entry,
         long topVer);
 
@@ -195,13 +195,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 return;
             }
 
-            Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtEntryMap = null;
-            Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearEntryMap = null;
+            Map<ClusterNode, List<GridDhtCacheEntry>> dhtEntryMap = null;
+            Map<ClusterNode, List<GridDhtCacheEntry>> nearEntryMap = null;
 
             for (IgniteTxEntry e : allEntries()) {
                 assert e.cached() != null;
 
-                GridCacheContext<K, V> cacheCtx = e.cached().context();
+                GridCacheContext cacheCtx = e.cached().context();
 
                 if (cacheCtx.isNear())
                     continue;
@@ -226,7 +226,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                                 nearEntryMap = new GridLeanMap<>();
 
                             cacheCtx.dhtMap(nearNodeId(), topologyVersion(),
-                                (GridDhtCacheEntry<K, V>)e.cached(), log, dhtEntryMap, nearEntryMap);
+                                (GridDhtCacheEntry)e.cached(), log, dhtEntryMap, nearEntryMap);
                         }
 
                         break;
@@ -252,7 +252,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /**
      * @return DHT map.
      */
-    Map<UUID, GridDistributedTxMapping<K, V>> dhtMap() {
+    Map<UUID, GridDistributedTxMapping> dhtMap() {
         mapExplicitLocks();
 
         return dhtMap;
@@ -261,7 +261,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /**
      * @return Near map.
      */
-    Map<UUID, GridDistributedTxMapping<K, V>> nearMap() {
+    Map<UUID, GridDistributedTxMapping> nearMap() {
         mapExplicitLocks();
 
         return nearMap;
@@ -271,7 +271,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param nodeId Node ID.
      * @return Mapping.
      */
-    GridDistributedTxMapping<K, V> dhtMapping(UUID nodeId) {
+    GridDistributedTxMapping dhtMapping(UUID nodeId) {
         return dhtMap.get(nodeId);
     }
 
@@ -279,35 +279,35 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param nodeId Node ID.
      * @return Mapping.
      */
-    GridDistributedTxMapping<K, V> nearMapping(UUID nodeId) {
+    GridDistributedTxMapping nearMapping(UUID nodeId) {
         return nearMap.get(nodeId);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) {
+    void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
         addMapping(mappings, dhtMap);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) {
+    void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
         addMapping(mappings, nearMap);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    public void addDhtMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) {
+    public void addDhtMapping(Map<UUID, GridDistributedTxMapping> mappings) {
         addMapping0(mappings, dhtMap);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    public void addNearMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) {
+    public void addNearMapping(Map<UUID, GridDistributedTxMapping> mappings) {
         addMapping0(mappings, nearMap);
     }
     /**
@@ -343,7 +343,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @return {@code True} if was removed.
      */
     private boolean removeMapping(UUID nodeId, @Nullable GridCacheEntryEx entry,
-        Map<UUID, GridDistributedTxMapping<K, V>> map) {
+        Map<UUID, GridDistributedTxMapping> map) {
         if (entry != null) {
             if (log.isDebugEnabled())
                 log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']');
@@ -353,7 +353,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
             if (txEntry == null)
                 return false;
 
-            GridDistributedTxMapping<K, V> m = map.get(nodeId);
+            GridDistributedTxMapping m = map.get(nodeId);
 
             boolean ret = m != null && m.removeEntry(txEntry);
 
@@ -371,22 +371,22 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param dst Transaction mappings.
      */
     private void addMapping(
-        Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings,
-        Map<UUID, GridDistributedTxMapping<K, V>> dst
+        Map<ClusterNode, List<GridDhtCacheEntry>> mappings,
+        Map<UUID, GridDistributedTxMapping> dst
     ) {
-        for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : mappings.entrySet()) {
+        for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapping : mappings.entrySet()) {
             ClusterNode n = mapping.getKey();
 
-            GridDistributedTxMapping<K, V> m = dst.get(n.id());
+            GridDistributedTxMapping m = dst.get(n.id());
 
-            List<GridDhtCacheEntry<K, V>> entries = mapping.getValue();
+            List<GridDhtCacheEntry> entries = mapping.getValue();
 
-            for (GridDhtCacheEntry<K, V> entry : entries) {
+            for (GridDhtCacheEntry entry : entries) {
                 IgniteTxEntry txEntry = txMap.get(entry.txKey());
 
                 if (txEntry != null) {
                     if (m == null)
-                        dst.put(n.id(), m = new GridDistributedTxMapping<>(n));
+                        dst.put(n.id(), m = new GridDistributedTxMapping(n));
 
                     m.add(txEntry);
                 }
@@ -399,11 +399,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param dst Map to add to.
      */
     private void addMapping0(
-        Map<UUID, GridDistributedTxMapping<K, V>> mappings,
-        Map<UUID, GridDistributedTxMapping<K, V>> dst
+        Map<UUID, GridDistributedTxMapping> mappings,
+        Map<UUID, GridDistributedTxMapping> dst
     ) {
-        for (Map.Entry<UUID, GridDistributedTxMapping<K, V>> entry : mappings.entrySet()) {
-            GridDistributedTxMapping<K, V> targetMapping = dst.get(entry.getKey());
+        for (Map.Entry<UUID, GridDistributedTxMapping> entry : mappings.entrySet()) {
+            GridDistributedTxMapping targetMapping = dst.get(entry.getKey());
 
             if (targetMapping == null)
                 dst.put(entry.getKey(), entry.getValue());
@@ -415,7 +415,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int part) {
+    @Override public void addInvalidPartition(GridCacheContext ctx, int part) {
         assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']';
     }
 
@@ -437,9 +437,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
         checkInternal(e.txKey());
 
-        GridCacheContext<K, V> cacheCtx = e.context();
+        GridCacheContext cacheCtx = e.context();
 
-        GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+        GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
         try {
             IgniteTxEntry entry = txMap.get(e.txKey());
@@ -448,7 +448,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 entry.op(e.op()); // Absolutely must set operation, as default is DELETE.
                 entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
                 entry.entryProcessors(e.entryProcessors());
-                entry.valueBytes(e.valueBytes());
                 entry.ttl(e.ttl());
                 entry.filters(e.filters());
                 entry.expiry(e.expiry());
@@ -461,22 +460,27 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
                 addActiveCache(dhtCache.context());
 
-                while (true) {
-                    GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(entry.key(), topologyVersion());
-
-                    try {
-                        // Set key bytes to avoid serializing in future.
-                        cached.keyBytes(entry.keyBytes());
-
-                        entry.cached(cached, entry.keyBytes());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry when adding to dht tx (will retry): " + cached);
-                    }
-                }
+                GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
+
+                entry.cached(cached, null);
+
+// TODO IGNITE-51.
+//                while (true) {
+//                    GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
+//
+//                    try {
+//                        // Set key bytes to avoid serializing in future.
+//                        cached.keyBytes(entry.keyBytes());
+//
+//                        entry.cached(cached, null);
+//
+//                        break;
+//                    }
+//                    catch (GridCacheEntryRemovedException ignore) {
+//                        if (log.isDebugEnabled())
+//                            log.debug("Got removed entry when adding to dht tx (will retry): " + cached);
+//                    }
+//                }
 
                 GridCacheVersion explicit = entry.explicitVersion();
 
@@ -514,8 +518,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @return Lock future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync(
-        GridCacheContext<K, V> cacheCtx,
+    IgniteInternalFuture<GridCacheReturn<Object>> lockAllAsync(
+        GridCacheContext cacheCtx,
         List<GridCacheEntryEx> entries,
         boolean onePhaseCommit,
         long msgId,
@@ -529,7 +533,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
             return new GridFinishedFuture<>(cctx.kernalContext(), e);
         }
 
-        final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
+        final GridCacheReturn<Object> ret = new GridCacheReturn<>(false);
 
         if (F.isEmpty(entries))
             return new GridFinishedFuture<>(cctx.kernalContext(), ret);
@@ -539,23 +543,23 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         onePhaseCommit(onePhaseCommit);
 
         try {
-            Set<K> skipped = null;
+            Set<KeyCacheObject> skipped = null;
 
             long topVer = topologyVersion();
 
-            GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+            GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
             // Enlist locks into transaction.
             for (int i = 0; i < entries.size(); i++) {
                 GridCacheEntryEx entry = entries.get(i);
 
-                K key = entry.key();
+                KeyCacheObject key = entry.key();
 
                 IgniteTxEntry txEntry = entry(entry.txKey());
 
                 // First time access.
                 if (txEntry == null) {
-                    GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(key, topVer);
+                    GridDhtCacheEntry cached = dhtCache.entryExx(key, topVer);
 
                     addActiveCache(dhtCache.context());
 
@@ -567,7 +571,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         null,
                         cached,
                         null,
-                        CU.<K, V>empty(),
+                        CU.empty(),
                         false,
                         -1L,
                         -1L,
@@ -576,7 +580,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                     if (read)
                         txEntry.ttl(accessTtl);
 
-                    txEntry.cached(cached, txEntry.keyBytes());
+                    txEntry.cached(cached, null);
 
                     addReader(msgId, cached, txEntry, topVer);
                 }
@@ -590,13 +594,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
             assert pessimistic();
 
-            Collection<K> keys = F.viewReadOnly(entries, CU.<K, V>entry2Key());
+            Collection<KeyCacheObject> keys = F.viewReadOnly(entries, CU.entry2Key());
 
             // Acquire locks only after having added operation to the write set.
             // Otherwise, during rollback we will not know whether locks need
             // to be rolled back.
             // Loose all skipped and previously locked (we cannot reenter locks here).
-            final Collection<? extends K> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys;
+            final Collection<KeyCacheObject> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys;
 
             if (log.isDebugEnabled())
                 log.debug("Lock keys: " + passedKeys);
@@ -620,14 +624,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param filter Entry write filter.
      * @return Future for lock acquisition.
      */
-    private IgniteInternalFuture<GridCacheReturn<V>> obtainLockAsync(
-        final GridCacheContext<K, V> cacheCtx,
-        GridCacheReturn<V> ret,
-        final Collection<? extends K> passedKeys,
+    private IgniteInternalFuture<GridCacheReturn<Object>> obtainLockAsync(
+        final GridCacheContext cacheCtx,
+        GridCacheReturn<Object> ret,
+        final Collection<KeyCacheObject> passedKeys,
         final boolean read,
-        final Set<K> skipped,
+        final Set<KeyCacheObject> skipped,
         final long accessTtl,
-        @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+        @Nullable final IgnitePredicate<Cache.Entry<Object, Object>>[] filter) {
         if (log.isDebugEnabled())
             log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
                 skipped + ']');
@@ -635,22 +639,24 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         if (passedKeys.isEmpty())
             return new GridFinishedFuture<>(cctx.kernalContext(), ret);
 
-        GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
+        GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
 
-        IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-            lockTimeout(),
-            this,
-            isInvalidate(),
-            read,
-            /*retval*/false,
-            isolation,
-            accessTtl,
-            CU.<K, V>empty());
+        IgniteInternalFuture<Boolean> fut = null;
+// TODO IGNTIE-51
+//        IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
+//            lockTimeout(),
+//            this,
+//            isInvalidate(),
+//            read,
+//            /*retval*/false,
+//            isolation,
+//            accessTtl,
+//            CU.empty());
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn<V>>(ret) {
-                @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
+            new PLC1<GridCacheReturn<Object>>(ret) {
+                @Override protected GridCacheReturn<Object> postLock(GridCacheReturn<Object> ret) throws IgniteCheckedException {
                     if (log.isDebugEnabled())
                         log.debug("Acquired transaction lock on keys: " + passedKeys);
 
@@ -662,7 +668,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         /*retval*/false,
                         /*read*/read,
                         accessTtl,
-                        filter == null ? CU.<K, V>empty() : filter,
+                        filter == null ? CU.empty() : filter,
                         /**computeInvoke*/false);
 
                     return ret;
@@ -672,10 +678,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addGroupTxMapping(Collection<IgniteTxKey<K>> keys) {
+    @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) {
         assert groupLock();
 
-        for (GridDistributedTxMapping<K, V> mapping : dhtMap.values())
+        for (GridDistributedTxMapping mapping : dhtMap.values())
             mapping.entries(Collections.unmodifiableCollection(txMap.values()), true);
 
         // Here we know that affinity key for all given keys is our group lock key.
@@ -683,9 +689,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         // Add near readers. If near cache is disabled on all nodes, do nothing.
         Collection<UUID> backupIds = dhtMap.keySet();
 
-        Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> locNearMap = null;
+        Map<ClusterNode, List<GridDhtCacheEntry>> locNearMap = null;
 
-        for (IgniteTxKey<K> key : keys) {
+        for (IgniteTxKey key : keys) {
             IgniteTxEntry txEntry = entry(key);
 
             if (!txEntry.groupLockEntry() || txEntry.context().isNear())
@@ -695,7 +701,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
             while (true) {
                 try {
-                    GridDhtCacheEntry<K, V> entry = (GridDhtCacheEntry<K, V>)txEntry.cached();
+                    GridDhtCacheEntry entry = (GridDhtCacheEntry)txEntry.cached();
 
                     Collection<UUID> readers = entry.readers();
 
@@ -711,7 +717,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                             if (locNearMap == null)
                                 locNearMap = new HashMap<>();
 
-                            List<GridDhtCacheEntry<K, V>> entries = locNearMap.get(n);
+                            List<GridDhtCacheEntry> entries = locNearMap.get(n);
 
                             if (entries == null)
                                 locNearMap.put(n, entries = new LinkedList<>());
@@ -724,7 +730,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), txEntry.keyBytes());
+                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), null);
                 }
             }
         }
@@ -823,7 +829,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      *
      * @param fut Expected future.
      */
-    protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> fut);
+    protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
 
     /** {@inheritDoc} */
     @Override public void rollback() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
index 0bbc33f..d207d76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
@@ -77,12 +77,12 @@ public class GridDhtTxMapping<K, V> {
      *
      * @param mappings Mappings.
      */
-    public void initLast(Collection<GridDistributedTxMapping<K, V>> mappings) {
+    public void initLast(Collection<GridDistributedTxMapping> mappings) {
         assert this.mappings.size() == mappings.size();
 
         int idx = 0;
 
-        for (GridDistributedTxMapping<?, ?> map : mappings) {
+        for (GridDistributedTxMapping map : mappings) {
             TxMapping mapping = this.mappings.get(idx);
 
             map.lastBackups(lastBackups(mapping, idx));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index f9e4cae..9344b2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -65,13 +65,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
     /** Transaction. */
     @GridToStringExclude
-    private GridDhtTxLocalAdapter<K, V> tx;
+    private GridDhtTxLocalAdapter tx;
 
     /** Near mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> nearMap;
+    private Map<UUID, GridDistributedTxMapping> nearMap;
 
     /** DHT mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap;
+    private Map<UUID, GridDistributedTxMapping> dhtMap;
 
     /** Logger. */
     private IgniteLogger log;
@@ -101,7 +101,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     private IgniteUuid nearMiniId;
 
     /** DHT versions map. */
-    private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap;
+    private Map<IgniteTxKey, GridCacheVersion> dhtVerMap;
 
     /** {@code True} if this is last prepare operation for node. */
     private boolean last;
@@ -113,19 +113,19 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     private boolean retVal;
 
     /** Return value. */
-    private GridCacheReturn<V> ret;
+    private GridCacheReturn<CacheObject> ret;
 
     /** Keys that did not pass the filter. */
-    private Collection<IgniteTxKey<K>> filterFailedKeys;
+    private Collection<IgniteTxKey> filterFailedKeys;
 
     /** Keys that should be locked. */
-    private GridConcurrentHashSet<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>();
+    private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
 
     /** Locks ready flag. */
     private volatile boolean locksReady;
 
     /** */
-    private IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb;
+    private IgniteInClosure<GridNearTxPrepareResponse> completeCb;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -143,14 +143,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
      */
     public GridDhtTxPrepareFuture(
-        GridCacheSharedContext<K, V> cctx,
-        final GridDhtTxLocalAdapter<K, V> tx,
+        GridCacheSharedContext cctx,
+        final GridDhtTxLocalAdapter tx,
         IgniteUuid nearMiniId,
-        Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap,
+        Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
         boolean last,
         boolean retVal,
         Collection<UUID> lastBackups,
-        IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+        IgniteInClosure<GridNearTxPrepareResponse> completeCb
     ) {
         super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
             @Override public boolean collect(IgniteInternalTx e) {
@@ -241,7 +241,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * @return Transaction.
      */
-    GridDhtTxLocalAdapter<K, V> tx() {
+    GridDhtTxLocalAdapter tx() {
         return tx;
     }
 
@@ -275,7 +275,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
         ret = new GridCacheReturn<>(null, true);
 
         for (IgniteTxEntry txEntry : tx.optimisticLockEntries()) {
-            GridCacheContext<K, V> cacheCtx = txEntry.context();
+            GridCacheContext cacheCtx = txEntry.context();
 
             GridCacheEntryEx cached = txEntry.cached();
 
@@ -297,7 +297,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                 if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
                     cached.unswap(true, retVal);
 
-                    V val = cached.innerGet(
+                    CacheObject val = cached.innerGet(
                         tx,
                         /*swap*/true,
                         /*read through*/(retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue(),
@@ -313,21 +313,24 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     if (retVal) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
-                            K key = txEntry.key();
+                            KeyCacheObject key = txEntry.key();
 
                             Object procRes = null;
                             Exception err = null;
 
+                            Object keyVal = key.value(cacheCtx);
+                            Object val0 = CU.value(val, cacheCtx);
 
-                            for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
+                            for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
                                 try {
-                                    CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.context(), key, val);
+                                    CacheInvokeEntry<Object, Object> invokeEntry =
+                                        new CacheInvokeEntry<>(txEntry.context(), keyVal, val0);
 
-                                    EntryProcessor<K, V, ?> processor = t.get1();
+                                    EntryProcessor<Object, Object, Object> processor = t.get1();
 
                                     procRes = processor.process(invokeEntry, t.get2());
 
-                                    val = invokeEntry.getValue();
+                                    val = cacheCtx.toCacheObject(invokeEntry.getValue());
                                 }
                                 catch (Exception e) {
                                     err = e;
@@ -346,7 +349,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                             ret.value(val);
                     }
 
-                    if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) {
+                    if (hasFilters && !cacheCtx.isAll(cached.wrapLazyValue(), txEntry.filters())) {
                         if (expiry != null)
                             txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
 
@@ -386,7 +389,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      * @param nodeId Sender.
      * @param res Result.
      */
-    public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
+    public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) {
         if (!isDone()) {
             for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
                 if (isMini(fut)) {
@@ -416,17 +419,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             Collections.singletonList(tx.groupLockEntry()) : writes;
 
         for (IgniteTxEntry txEntry : checkEntries) {
-            GridCacheContext<K, V> cacheCtx = txEntry.context();
+            GridCacheContext cacheCtx = txEntry.context();
 
             if (cacheCtx.isLocal())
                 continue;
 
-            GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();
+            GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached();
 
             if (entry == null) {
-                entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key());
+                entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
 
-                txEntry.cached(entry, txEntry.keyBytes());
+                txEntry.cached(entry, null);
             }
 
             if (tx.optimistic() && txEntry.explicitVersion() == null) {
@@ -450,9 +453,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry);
 
-                    entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key());
+                    entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
 
-                    txEntry.cached(entry, txEntry.keyBytes());
+                    txEntry.cached(entry, null);
                 }
             }
         }
@@ -490,13 +493,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             assert last;
 
             // Must create prepare response before transaction is committed to grab correct return value.
-            final GridNearTxPrepareResponse<K, V> res = createPrepareResponse();
+            final GridNearTxPrepareResponse res = createPrepareResponse();
 
             onComplete();
 
             if (!tx.near()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
-                    IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync();
+                    IgniteInternalFuture<IgniteInternalTx> fut =
+                        this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync();
 
                     fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> gridCacheTxGridFuture) {
@@ -559,7 +563,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * @throws IgniteCheckedException If failed to send response.
      */
-    private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+    private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
         if (!tx.nearNodeId().equals(cctx.localNodeId()))
             cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
         else {
@@ -572,11 +576,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * @return Prepare response.
      */
-    private GridNearTxPrepareResponse<K, V> createPrepareResponse() {
+    private GridNearTxPrepareResponse createPrepareResponse() {
         // Send reply back to originating near node.
         Throwable prepErr = err.get();
 
-        GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(
+        GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
             tx.nearXidVersion(),
             tx.colocated() ? tx.xid() : tx.nearFutureId(),
             nearMiniId == null ? tx.xid() : nearMiniId,
@@ -605,13 +609,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * @param res Response being sent.
      */
-    private void addDhtValues(GridNearTxPrepareResponse<K, V> res) {
+    private void addDhtValues(GridNearTxPrepareResponse res) {
         // Interceptor on near node needs old values to execute callbacks.
         if (!F.isEmpty(writes)) {
             for (IgniteTxEntry e : writes) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
-                GridCacheContext<K, V> cacheCtx = txEntry.context();
+                GridCacheContext cacheCtx = txEntry.context();
 
                 assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
 
@@ -621,16 +625,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                         GridCacheVersion dhtVer = entry.version();
 
-                        V val0 = null;
+                        CacheObject val0 = null;
                         byte[] valBytes0 = null;
 
                         GridCacheValueBytes valBytesTuple = entry.valueBytes();
 
                         if (!valBytesTuple.isNull()) {
-                            if (valBytesTuple.isPlain())
-                                val0 = (V) valBytesTuple.get();
-                            else
-                                valBytes0 = valBytesTuple.get();
+// TODO IGNITE-51
+//                            if (valBytesTuple.isPlain())
+//                                val0 = (V) valBytesTuple.get();
+//                            else
+//                                valBytes0 = valBytesTuple.get();
                         }
                         else
                             val0 = entry.rawGet();
@@ -642,19 +647,19 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     }
                     catch (GridCacheEntryRemovedException ignored) {
                         // Retry.
-                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes());
+                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
                     }
                 }
             }
         }
 
-        for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : dhtVerMap.entrySet()) {
+        for (Map.Entry<IgniteTxKey, GridCacheVersion> ver : dhtVerMap.entrySet()) {
             IgniteTxEntry txEntry = tx.entry(ver.getKey());
 
             if (res.hasOwnedValue(ver.getKey()))
                 continue;
 
-            GridCacheContext<K, V> cacheCtx = txEntry.context();
+            GridCacheContext cacheCtx = txEntry.context();
 
             while (true) {
                 try {
@@ -663,16 +668,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     GridCacheVersion dhtVer = entry.version();
 
                     if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) {
-                        V val0 = null;
+                        CacheObject val0 = null;
                         byte[] valBytes0 = null;
 
                         GridCacheValueBytes valBytesTuple = entry.valueBytes();
 
                         if (!valBytesTuple.isNull()) {
-                            if (valBytesTuple.isPlain())
-                                val0 = (V)valBytesTuple.get();
-                            else
-                                valBytes0 = valBytesTuple.get();
+// TODO IGNITE-51.
+//                            if (valBytesTuple.isPlain())
+//                                val0 = (V)valBytesTuple.get();
+//                            else
+//                                valBytes0 = valBytesTuple.get();
                         }
                         else
                             val0 = entry.rawGet();
@@ -684,7 +690,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes());
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
                 }
             }
         }
@@ -770,8 +776,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             onEntriesLocked();
 
             {
-                Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>();
-                Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>();
+                Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
+                Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
 
                 boolean hasRemoteNodes = false;
 
@@ -796,14 +802,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                 assert tx.transactionNodes() != null;
 
                 // Create mini futures.
-                for (GridDistributedTxMapping<K, V> dhtMapping : tx.dhtMap().values()) {
+                for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
                     assert !dhtMapping.empty();
 
                     ClusterNode n = dhtMapping.node();
 
                     assert !n.isLocal();
 
-                    GridDistributedTxMapping<K, V> nearMapping = tx.nearMap().get(n.id());
+                    GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
 
                     Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
 
@@ -818,7 +824,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     assert txNodes != null;
 
-                    GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>(
+                    GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
                         futId,
                         fut.futureId(),
                         tx.topologyVersion(),
@@ -838,7 +844,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     for (IgniteTxEntry entry : dhtWrites) {
                         try {
-                            GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached();
+                            GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
                             GridCacheContext<K, V> cacheCtx = cached.context();
 
@@ -909,7 +915,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     }
                 }
 
-                for (GridDistributedTxMapping<K, V> nearMapping : tx.nearMap().values()) {
+                for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
                     if (!tx.dhtMap().containsKey(nearMapping.node().id())) {
                         assert nearMapping.writes() != null;
 
@@ -917,7 +923,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                         add(fut); // Append new future.
 
-                        GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>(
+                        GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
                             futId,
                             fut.futureId(),
                             tx.topologyVersion(),
@@ -981,14 +987,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      */
     private boolean map(
         IgniteTxEntry entry,
-        Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap,
-        Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) {
+        Map<UUID, GridDistributedTxMapping> futDhtMap,
+        Map<UUID, GridDistributedTxMapping> futNearMap) {
         if (entry.cached().isLocal())
             return false;
 
-        GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached();
+        GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-        GridCacheContext<K, V> cacheCtx = entry.context();
+        GridCacheContext cacheCtx = entry.context();
 
         GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
@@ -1050,22 +1056,22 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      * @return {@code True} if mapped.
      */
     private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, GridDistributedTxMapping<K, V>> locMap) {
+        Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
         boolean ret = false;
 
         if (nodes != null) {
             for (ClusterNode n : nodes) {
-                GridDistributedTxMapping<K, V> global = globalMap.get(n.id());
+                GridDistributedTxMapping global = globalMap.get(n.id());
 
                 if (global == null)
-                    globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n));
+                    globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 
                 global.add(entry);
 
-                GridDistributedTxMapping<K, V> loc = locMap.get(n.id());
+                GridDistributedTxMapping loc = locMap.get(n.id());
 
                 if (loc == null)
-                    locMap.put(n.id(), loc = new GridDistributedTxMapping<>(n));
+                    locMap.put(n.id(), loc = new GridDistributedTxMapping(n));
 
                 loc.add(entry);
 
@@ -1123,11 +1129,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
         /** DHT mapping. */
         @GridToStringInclude
-        private GridDistributedTxMapping<K, V> dhtMapping;
+        private GridDistributedTxMapping dhtMapping;
 
         /** Near mapping. */
         @GridToStringInclude
-        private GridDistributedTxMapping<K, V> nearMapping;
+        private GridDistributedTxMapping nearMapping;
 
         /**
          * Empty constructor required for {@link Externalizable}.
@@ -1141,7 +1147,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
          * @param dhtMapping Mapping.
          * @param nearMapping nearMapping.
          */
-        MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) {
+        MiniFuture(UUID nodeId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
             super(cctx.kernalContext());
 
             assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node();
@@ -1192,7 +1198,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
         /**
          * @param res Result callback.
          */
-        void onResult(GridDhtTxPrepareResponse<K, V> res) {
+        void onResult(GridDhtTxPrepareResponse res) {
             if (res.error() != null)
                 // Fail the whole compound future.
                 onError(res.error());
@@ -1203,7 +1209,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                         if (res.nearEvicted().contains(entry.txKey())) {
                             while (true) {
                                 try {
-                                    GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached();
+                                    GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
                                     cached.removeReader(nearMapping.node().id(), res.messageId());
 
@@ -1215,7 +1221,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                                     if (e == null)
                                         break;
 
-                                    entry.cached(e, entry.keyBytes());
+                                    entry.cached(e, null);
                                 }
                             }
                         }
@@ -1251,7 +1257,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED);
 
-                for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) {
+                for (GridCacheEntryInfo info : res.preloadEntries()) {
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());
 
                     while (true) {
@@ -1260,7 +1266,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                         GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
 
                         try {
-                            if (entry.initialValue(info.value(), info.valueBytes(), info.version(),
+                            if (entry.initialValue(info.value(), null, info.version(),
                                 info.ttl(), info.expireTime(), true, topVer, drType)) {
                                 if (rec && !entry.isInternal())
                                     cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index a3936d4..8579747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -139,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      * @param grpLockKey Group lock key if transaction is group-lock.
      */
     public GridDhtTxRemote(
-        GridCacheSharedContext<K, V> ctx,
+        GridCacheSharedContext ctx,
         UUID nearNodeId,
         IgniteUuid rmtFutId,
         UUID nodeId,
@@ -219,7 +219,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
+    @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) {
         if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId))
             return false;
 
@@ -231,7 +231,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) {
+    @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) {
         super.addInvalidPartition(cacheCtx, part);
 
         for (Iterator<IgniteTxEntry> it = writeMap.values().iterator(); it.hasNext();) {
@@ -256,15 +256,15 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
     public void addWrite(IgniteTxEntry entry, ClassLoader ldr) throws IgniteCheckedException {
         entry.unmarshal(cctx, false, ldr);
 
-        GridCacheContext<K, V> cacheCtx = entry.context();
+        GridCacheContext cacheCtx = entry.context();
 
         try {
-            GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion());
+            GridDhtCacheEntry cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion());
 
             checkInternal(entry.txKey());
 
             // Initialize cache entry.
-            entry.cached(cached, entry.keyBytes());
+            entry.cached(cached, null);
 
             writeMap.put(entry.txKey(), entry);
 
@@ -285,22 +285,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      * @param entryProcessors Entry processors.
      * @param ttl TTL.
      */
-    public void addWrite(GridCacheContext<K, V> cacheCtx,
+    public void addWrite(GridCacheContext cacheCtx,
         GridCacheOperation op,
-        IgniteTxKey<K> key,
+        IgniteTxKey key,
         byte[] keyBytes,
-        @Nullable V val,
+        @Nullable CacheObject val,
         @Nullable byte[] valBytes,
-        @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors,
+        @Nullable Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessors,
         long ttl) {
         checkInternal(key);
 
         if (isSystemInvalidate())
             return;
 
-        GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion());
+        GridDhtCacheEntry cached = cacheCtx.dht().entryExx(key.key(), topologyVersion());
 
-        IgniteTxEntry txEntry = new IgniteTxEntry<>(cacheCtx,
+        IgniteTxEntry txEntry = new IgniteTxEntry(cacheCtx,
             this,
             op,
             val,
@@ -309,8 +309,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
             cached,
             null);
 
-        txEntry.keyBytes(keyBytes);
-        txEntry.valueBytes(valBytes);
+// TODO IGNITE-51.
+//        txEntry.keyBytes(keyBytes);
+//        txEntry.valueBytes(valBytes);
         txEntry.entryProcessors(entryProcessors);
 
         writeMap.put(key, txEntry);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
index 95c7fc9..a818500 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
@@ -27,15 +27,15 @@ import java.util.*;
 /**
  * Empty cache map that will never store any entries.
  */
-public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
+public class GridNoStorageCacheMap extends GridCacheConcurrentMap {
     /** Empty triple. */
-    private final GridTriple<GridCacheMapEntry<K,V>> emptyTriple =
+    private final GridTriple<GridCacheMapEntry> emptyTriple =
         new GridTriple<>(null, null, null);
 
     /**
      * @param ctx Cache context.
      */
-    public GridNoStorageCacheMap(GridCacheContext<K, V> ctx) {
+    public GridNoStorageCacheMap(GridCacheContext ctx) {
         super(ctx, 0, 0.75f, 1);
     }
 
@@ -60,25 +60,29 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> randomEntry() {
+    @Override public GridCacheMapEntry randomEntry() {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> getEntry(Object key) {
+    @Override public GridCacheMapEntry getEntry(Object key) {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) {
+    @Override public GridCacheMapEntry putEntry(long topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) {
         throw new AssertionError();
     }
 
     /** {@inheritDoc} */
-    @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val,
-        long ttl, boolean create) {
+    @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(long topVer,
+        KeyCacheObject key,
+        @Nullable CacheObject val,
+        long ttl,
+        boolean create)
+    {
         if (create) {
-            GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val,
+            GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val,
                 null, 0, 0);
 
             return new GridTriple<>(entry, null, null);
@@ -88,7 +92,7 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void putAll(Map<? extends K, ? extends V> m, long ttl) {
+    @Override public void putAll(Map<KeyCacheObject, CacheObject> m, long ttl) {
         throw new AssertionError();
     }
 
@@ -98,7 +102,7 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> removeEntryIfObsolete(K key) {
+    @Override public GridCacheMapEntry removeEntryIfObsolete(KeyCacheObject key) {
         throw new AssertionError();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index ece2fa3..eef10d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -281,7 +281,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             return;
         }
 
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
+        Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings =
             U.newHashMap(CU.affinityNodes(cctx, topVer).size());
 
         final int keysSize = keys.size();
@@ -299,8 +299,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
                 throw err;
             }
-
-            hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
+// TODO IGNITE-51.
+//            hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
         }
 
         if (isDone())
@@ -328,7 +328,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                 final GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
                     cache().getDhtAsync(n.id(),
                         -1,
-                        mappedKeys,
+                        // TODO IGNITE-51
+                        // mappedKeys,
+                        null,
                         readThrough,
                         reload,
                         topVer,
@@ -355,7 +357,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         ", invalidParts=" + invalidParts + ']';
 
                     // Remap recursively.
-                    map(remapKeys, mappings, updTopVer);
+                    // TODO IGNITE-51
+                    // map(remapKeys, mappings, updTopVer);
                 }
 
                 // Add new future.
@@ -382,7 +385,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     futId,
                     fut.futureId(),
                     ver,
-                    mappedKeys,
+                    // TODO IGNITE-51
+                    // mappedKeys,
+                    null,
                     readThrough,
                     reload,
                     topVer,
@@ -428,11 +433,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         while (true) {
             GridCacheEntryEx entry = null;
 
+            // TODO IGNITE-51.
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
             try {
                 if (!reload && allowLocRead) {
                     try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                            colocated.peekEx(key);
+                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(cacheKey) :
+                            colocated.peekEx(cacheKey);
 
                         // If our DHT cache do has value, then we peek it.
                         if (entry != null) {
@@ -456,17 +464,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
                                 if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(key);
+                                    colocated.removeIfObsolete(cacheKey);
                             }
                             else {
                                 K key0 = key;
 
-                                if (cctx.portableEnabled()) {
-                                    v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable);
-                                    key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
-                                }
-
-                                locVals.put(key0, v);
+// TODO IGNITE-51.
+//                                if (cctx.portableEnabled()) {
+//                                    v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable);
+//                                    key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
+//                                }
+//
+//                                locVals.put(key0, v);
 
                                 return false;
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8c847dc..0003f6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -128,7 +128,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 long ttl,
                 int hdrId)
             {
-                return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
+                return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId);
             }
         });
 
@@ -790,7 +790,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
-        final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>(
+        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
@@ -799,13 +799,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 conflictPutMap.keySet() : conflictRmvMap.keySet(),
             map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
             invokeArgs,
-            conflictPutMap != null ? conflictPutMap.values() : null,
+            (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
             conflictRmvMap != null ? conflictRmvMap.values() : null,
             retval,
             rawRetval,
-            cached,
             prj != null ? prj.expiry() : null,
-            filter,
+            (IgnitePredicate[])filter,
             subjId,
             taskNameHash);
 
@@ -854,7 +853,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
-        final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>(
+        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
@@ -866,9 +865,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             keys != null ? null : conflictMap.values(),
             retval,
             rawRetval,
-            cached,
             (filter != null && prj != null) ? prj.expiry() : null,
-            filter,
+            (IgnitePredicate[])filter,
             subjId,
             taskNameHash);
 
@@ -1140,7 +1138,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             !ctx.dr().receiveEnabled()                     // and no DR.
                         ) {
                             // This method can only be used when there are no replicated entries in the batch.
-                            UpdateBatchResult<K, V> updRes = updateWithBatch(node,
+                            UpdateBatchResult updRes = updateWithBatch(node,
                                 hasNear,
                                 req,
                                 res,
@@ -1253,7 +1251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
     @SuppressWarnings("unchecked")
-    private UpdateBatchResult<K, V> updateWithBatch(
+    private UpdateBatchResult updateWithBatch(
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
@@ -1276,7 +1274,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             catch (IgniteCheckedException e) {
                 res.addFailedKeys(req.keys(), e);
 
-                return new UpdateBatchResult<>();
+                return new UpdateBatchResult();
             }
         }
 
@@ -1288,7 +1286,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         Collection<KeyCacheObject> rmvKeys = null;
 
-        UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
+        UpdateBatchResult updRes = new UpdateBatchResult();
 
         List<GridDhtCacheEntry> filtered = new ArrayList<>(size);
 
@@ -1351,20 +1349,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         taskName,
                         null);
 
+                    Object keyVal = entry.key().value(ctx);
+                    Object oldVal = CU.value(old, ctx);
+                    Object updatedVal = null;
+
                     CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx,
-                        entry.key().value(ctx),
-                        old.value(ctx));
+                        keyVal,
+                        oldVal);
 
-                    Object updated;
+                    CacheObject updated;
                     CacheInvokeResult invokeRes = null;
 
                     try {
                         Object computed = entryProcessor.process(invokeEntry, req.invokeArguments());
 
-                        updated = ctx.unwrapTemporary(invokeEntry.getValue());
+                        updatedVal = ctx.unwrapTemporary(invokeEntry.getValue());
 
-                        if (ctx.portableEnabled())
-                            updated = (V)ctx.marshalToPortable(updated);
+                        updated = ctx.toCacheObject(updatedVal);
 
                         if (computed != null)
                             invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
@@ -1381,7 +1382,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     if (updated == null) {
                         if (intercept) {
                             IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
-                                entry.key(), old);
+                                keyVal, oldVal);
 
                             if (ctx.cancelRemove(interceptorRes))
                                 continue;
@@ -1423,10 +1424,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
                     else {
                         if (intercept) {
-                            updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
+                            Object val = ctx.config().getInterceptor().onBeforePut(keyVal, oldVal, updatedVal);
 
-                            if (updated == null)
+                            if (val == null)
                                 continue;
+
+                            updated = ctx.toCacheKeyObject(ctx.unwrapTemporary(val));
                         }
 
                         // Update previous batch.
@@ -1460,7 +1463,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (putMap == null)
                             putMap = new LinkedHashMap<>(size, 1.0f);
 
-                        putMap.put(entry.key(), ctx.<V>unwrapTemporary(updated));
+                        putMap.put(entry.key(), updated);
                     }
 
                     if (entryProcessorMap == null)
@@ -1469,10 +1472,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     entryProcessorMap.put(entry.key(), entryProcessor);
                 }
                 else if (op == UPDATE) {
-                    V updated = req.value(i);
+                    CacheObject updated = req.value(i);
 
                     if (intercept) {
-                        V old = entry.innerGet(
+                        CacheObject old = entry.innerGet(
                              null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
@@ -1486,12 +1489,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             taskName,
                             null);
 
-                        updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
+                        Object val = ctx.config().getInterceptor().onBeforePut(entry.key().value(ctx),
+                            CU.value(old, ctx),
+                            updated.value(ctx));
 
-                        if (updated == null)
+                        if (val == null)
                             continue;
 
-                        updated = ctx.unwrapTemporary(updated);
+                        updated = ctx.toCacheObject(ctx.unwrapTemporary(val));
                     }
 
                     assert updated != null;
@@ -1505,7 +1510,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     assert op == DELETE;
 
                     if (intercept) {
-                        V old = entry.innerGet(
+                        CacheObject old = entry.innerGet(
                             null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
@@ -1520,7 +1525,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             null);
 
                         IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
-                            entry.key(), old);
+                            entry.key().value(ctx),
+                            CU.value(old, ctx));
 
                         if (ctx.cancelRemove(interceptorRes))
                             continue;
@@ -1574,7 +1580,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @throws IgniteCheckedException If failed.
      */
     private void reloadIfNeeded(final List<GridDhtCacheEntry> entries) throws IgniteCheckedException {
-        Map<K, Integer> needReload = null;
+        Map<KeyCacheObject, Integer> needReload = null;
 
         for (int i = 0; i < entries.size(); i++) {
             GridDhtCacheEntry entry = entries.get(i);
@@ -1582,7 +1588,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (entry == null)
                 continue;
 
-            V val = entry.rawGetOrUnmarshal(false);
+            CacheObject val = entry.rawGetOrUnmarshal(false);
 
             if (val == null) {
                 if (needReload == null)
@@ -1593,10 +1599,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
 
         if (needReload != null) {
-            final Map<K, Integer> idxMap = needReload;
+            final Map<KeyCacheObject, Integer> idxMap = needReload;
 
-            ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<K, V>() {
-                @Override public void apply(K k, V v) {
+            ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() {
+                @Override public void apply(KeyCacheObject k, Object v) {
                     Integer idx = idxMap.get(k);
 
                     if (idx != null) {
@@ -1604,7 +1610,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         try {
                             GridCacheVersion ver = entry.version();
 
-                            entry.versionedValue(v, null, ver);
+                            entry.versionedValue(ctx.toCacheObject(v), null, ver);
                         }
                         catch (GridCacheEntryRemovedException e) {
                             assert false : "Entry should not get obsolete while holding lock [entry=" + entry +
@@ -1652,7 +1658,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridCacheReturn<Object> retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
-        List<K> keys = req.keys();
+        List<KeyCacheObject> keys = req.keys();
 
         long topVer = req.topologyVersion();
 
@@ -1662,11 +1668,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        Map<K, EntryProcessorResult<?>> computedMap = null;
+        Map<KeyCacheObject, EntryProcessorResult<?>> computedMap = null;
 
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
-            K k = keys.get(i);
+            KeyCacheObject k = keys.get(i);
 
             GridCacheOperation op = req.operation();
 
@@ -1699,7 +1705,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                 }
 
-                GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
+                GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
                     node.id(),
                     locNodeId,
@@ -1741,10 +1747,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             newValBytes = null; // Value has been changed.
                         }
 
-                        EntryProcessor<K, V, ?> entryProcessor = null;
+                        EntryProcessor<Object, Object, Object> entryProcessor = null;
 
                         if (req.forceTransformBackups() && op == TRANSFORM)
-                            entryProcessor = (EntryProcessor<K, V, ?>)writeVal;
+                            entryProcessor = (EntryProcessor<Object, Object, Object>)writeVal;
 
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
@@ -1775,7 +1781,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
                         if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
-                            GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult();
+                            GridCacheVersionConflictContext ctx = updRes.conflictResolveResult();
 
                             if (ctx != null && ctx.isMerge())
                                 newValBytes = null;
@@ -1869,15 +1875,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         List<GridDhtCacheEntry> entries,
         final GridCacheVersion ver,
         ClusterNode node,
-        @Nullable Map<K, V> putMap,
-        @Nullable Collection<K> rmvKeys,
-        @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap,
+        @Nullable Map<KeyCacheObject, CacheObject> putMap,
+        @Nullable Collection<KeyCacheObject> rmvKeys,
+        @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final GridNearAtomicUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         boolean replicate,
-        UpdateBatchResult<K, V> batchRes,
+        UpdateBatchResult batchRes,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry
     ) {
@@ -1896,17 +1902,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             if (putMap != null) {
                 // If fast mapping, filter primary keys for write to store.
-                Map<K, V> storeMap = req.fastMap() ?
-                    F.view(putMap, new P1<K>() {
-                        @Override public boolean apply(K key) {
+                Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
+                    F.view(putMap, new P1<KeyCacheObject>() {
+                        @Override public boolean apply(KeyCacheObject key) {
                             return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion());
                         }
                     }) :
                     putMap;
 
                 try {
-                    ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
-                        @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) {
+                    ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+                        @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
                             return F.t(v, ver);
                         }
                     }));
@@ -1919,9 +1925,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
             else {
                 // If fast mapping, filter primary keys for write to store.
-                Collection<K> storeKeys = req.fastMap() ?
-                    F.view(rmvKeys, new P1<K>() {
-                        @Override public boolean apply(K key) {
+                Collection<KeyCacheObject> storeKeys = req.fastMap() ?
+                    F.view(rmvKeys, new P1<KeyCacheObject>() {
+                        @Override public boolean apply(KeyCacheObject key) {
                             return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion());
                         }
                     }) :
@@ -1956,7 +1962,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     // We are holding java-level locks on entries at this point.
-                    V writeVal = op == UPDATE ? putMap.get(entry.key()) : null;
+                    CacheObject writeVal = op == UPDATE ? putMap.get(entry.key()) : null;
 
                     assert writeVal != null || op == DELETE : "null write value found.";
 
@@ -1971,7 +1977,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                     }
 
-                    GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
+                    GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
                         locNodeId,
@@ -2023,7 +2029,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         byte[] valBytes = valBytesTuple.getIfMarshaled();
 
-                        EntryProcessor<K, V, ?> entryProcessor =
+                        EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
                         if (!batchRes.readersOnly())
@@ -2090,7 +2096,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
 
         if (storeErr != null)
-            res.addFailedKeys((Collection<K>)storeErr.failedKeys(), storeErr.getCause());
+            res.addFailedKeys(storeErr.failedKeys(), storeErr.getCause(), ctx);
 
         return dhtFut;
     }
@@ -2108,7 +2114,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, long topVer)
         throws GridDhtInvalidPartitionException {
         if (keys.size() == 1) {
-            K key = keys.get(0);
+            KeyCacheObject key = keys.get(0);
 
             while (true) {
                 try {
@@ -2134,7 +2140,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size());
 
             while (true) {
-                for (K key : keys) {
+                for (KeyCacheObject key : keys) {
                     try {
                         GridDhtCacheEntry entry = entryExx(key, topVer);
 
@@ -2152,7 +2158,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 boolean retry = false;
 
                 for (int i = 0; i < locked.size(); i++) {
-                    GridCacheMapEntry<K, V> entry = locked.get(i);
+                    GridCacheMapEntry entry = locked.get(i);
 
                     if (entry == null)
                         continue;
@@ -2194,9 +2200,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         // Entries to skip eviction manager notification for.
         // Enqueue entries while holding locks.
-        Collection<K> skip = null;
+        Collection<KeyCacheObject> skip = null;
 
-        for (GridCacheMapEntry<K, V> entry : locked) {
+        for (GridCacheMapEntry entry : locked) {
             if (entry != null && entry.deleted()) {
                 if (skip == null)
                     skip = new HashSet<>(locked.size(), 1.0f);
@@ -2206,7 +2212,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
 
         // Release locks.
-        for (GridCacheMapEntry<K, V> entry : locked) {
+        for (GridCacheMapEntry entry : locked) {
             if (entry != null)
                 UNSAFE.monitorExit(entry);
         }
@@ -2223,7 +2229,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         // Must touch all entries since update may have deleted entries.
         // Eviction manager will remove empty entries.
-        for (GridCacheMapEntry<K, V> entry : locked) {
+        for (GridCacheMapEntry entry : locked) {
             if (entry != null && (skip == null || !skip.contains(entry.key())))
                 ctx.evicts().touch(entry, topVer);
         }
@@ -2286,7 +2292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             log.debug("Remapping near update request locally: " + req);
 
         Collection<?> vals;
-        Collection<GridCacheDrInfo<V>> drPutVals;
+        Collection<GridCacheDrInfo<?>> drPutVals;
         Collection<GridCacheVersion> drRmvVals;
 
         if (req.conflictVersions() == null) {
@@ -2322,7 +2328,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drPutVals = null;
         }
 
-        final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>(
+        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
@@ -2334,7 +2340,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drRmvVals,
             req.returnValue(),
             false,
-            null,
             req.expiry(),
             req.filter(),
             req.subjectId(),
@@ -2380,7 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        GridDhtAtomicUpdateFuture fut = new GridDhtAtomicUpdateFuture<>(ctx, completionCb, writeVer, updateReq,
+        GridDhtAtomicUpdateFuture fut = new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq,
             updateRes);
 
         ctx.mvcc().addAtomicFuture(fut.version(), fut);
@@ -2392,7 +2397,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param res Near get response.
      */
-    private void processNearGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) {
+    private void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
 
@@ -2433,7 +2438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicUpdateFuture<K, V> fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (fut != null)
             fut.onResult(nodeId, res);
@@ -2719,7 +2724,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Result of {@link GridDhtAtomicCache#updateWithBatch} execution.
      */
-    private static class UpdateBatchResult<K, V> {
+    private static class UpdateBatchResult {
         /** */
         private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
 
@@ -2730,7 +2735,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private boolean readersOnly;
 
         /** */
-        private Map<K, EntryProcessorResult> invokeRes;
+        private Map<KeyCacheObject, EntryProcessorResult> invokeRes;
 
         /**
          * @param entry Entry.
@@ -2765,14 +2770,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @param invokeRes Result for invoke operation.
          */
-        private void invokeResult(Map<K, EntryProcessorResult> invokeRes) {
+        private void invokeResult(Map<KeyCacheObject, EntryProcessorResult> invokeRes) {
             this.invokeRes = invokeRes;
         }
 
         /**
          * @return Result for invoke operation.
          */
-        Map<K, EntryProcessorResult> invokeResults() {
+        Map<KeyCacheObject, EntryProcessorResult> invokeResults() {
             return invokeRes;
         }
 


Mime
View raw message