ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] incubator-ignite git commit: # ignite-51
Date Thu, 26 Feb 2015 14:06:34 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-51 a265949e8 -> 832f114e8


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index eef10d9..d080e32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -167,7 +167,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     public void init() {
         long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
 
-        map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer);
+        Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
+            @Override public KeyCacheObject apply(K key) {
+                if (key == null) {
+                    NullPointerException err = new NullPointerException("Null key.");
+
+                    onDone(err);
+
+                    throw err;
+                }
+
+                return cctx.toCacheKeyObject(key);
+            }
+        });
+
+        map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
         markInitialized();
     }
@@ -182,13 +196,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         // Should not flip trackable flag from true to false since get future can be remapped.
     }
 
-    /**
-     * @return Keys.
-     */
-    Collection<? extends K> keys() {
-        return keys;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
@@ -274,14 +281,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @param mapped Mappings to check for duplicates.
      * @param topVer Topology version on which keys should be mapped.
      */
-    private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, long topVer) {
+    private void map(Collection<KeyCacheObject> keys,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
+        long topVer)
+    {
         if (CU.affinityNodes(cctx, topVer).isEmpty()) {
-            onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid)."));
+            onDone(new ClusterTopologyCheckedException("Failed to map keys for cache " +
+                "(all partition nodes left the grid)."));
 
             return;
         }
 
-        Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings =
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
             U.newHashMap(CU.affinityNodes(cctx, topVer).size());
 
         final int keysSize = keys.size();
@@ -291,17 +302,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         boolean hasRmtNodes = false;
 
         // Assign keys to primary nodes.
-        for (K key : keys) {
-            if (key == null) {
-                NullPointerException err = new NullPointerException("Null key");
-
-                onDone(err);
-
-                throw err;
-            }
-// TODO IGNITE-51.
-//            hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
-        }
+        for (KeyCacheObject key : keys)
+            hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
 
         if (isDone())
             return;
@@ -316,10 +318,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         }
 
         // Create mini futures.
-        for (Map.Entry<ClusterNode, LinkedHashMap<K, Boolean>> entry : mappings.entrySet()) {
+        for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> entry : mappings.entrySet()) {
             final ClusterNode n = entry.getKey();
 
-            final LinkedHashMap<K, Boolean> mappedKeys = entry.getValue();
+            final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue();
 
             assert !mappedKeys.isEmpty();
 
@@ -328,24 +330,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                 final GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
                     cache().getDhtAsync(n.id(),
                         -1,
-                        // TODO IGNITE-51
-                        // mappedKeys,
-                        null,
+                        mappedKeys,
                         readThrough,
                         reload,
                         topVer,
                         subjId,
                         taskName == null ? 0 : taskName.hashCode(),
-                        deserializePortable,
                         expiryPlc,
                         skipVals);
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
                 if (!F.isEmpty(invalidParts)) {
-                    Collection<K> remapKeys = new ArrayList<>(keysSize);
+                    Collection<KeyCacheObject> remapKeys = new ArrayList<>(keysSize);
 
-                    for (K key : keys) {
+                    for (KeyCacheObject key : keys) {
                         if (key != null && invalidParts.contains(cctx.affinity().partition(key)))
                             remapKeys.add(key);
                     }
@@ -357,8 +356,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         ", invalidParts=" + invalidParts + ']';
 
                     // Remap recursively.
-                    // TODO IGNITE-51
-                    // map(remapKeys, mappings, updTopVer);
+                    map(remapKeys, mappings, updTopVer);
                 }
 
                 // Add new future.
@@ -385,9 +383,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     futId,
                     fut.futureId(),
                     ver,
-                    // TODO IGNITE-51
-                    // mappedKeys,
-                    null,
+                    mappedKeys,
                     readThrough,
                     reload,
                     topVer,
@@ -421,8 +417,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @return {@code True} if has remote nodes.
      */
     @SuppressWarnings("ConstantConditions")
-    private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals,
-        long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) {
+    private boolean map(KeyCacheObject key,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, Map<K, V> locVals,
+        long topVer,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped) {
         GridDhtCacheAdapter<K, V> colocated = cache();
 
         boolean remote = false;
@@ -433,14 +431,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         while (true) {
             GridCacheEntryEx entry = null;
 
-            // TODO IGNITE-51.
-            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
             try {
                 if (!reload && allowLocRead) {
                     try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(cacheKey) :
-                            colocated.peekEx(cacheKey);
+                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                            colocated.peekEx(key);
 
                         // If our DHT cache do has value, then we peek it.
                         if (entry != null) {
@@ -464,18 +459,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
                                 if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(cacheKey);
+                                    colocated.removeIfObsolete(key);
                             }
                             else {
-                                K key0 = key;
-
-// TODO IGNITE-51.
-//                                if (cctx.portableEnabled()) {
-//                                    v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable);
-//                                    key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
-//                                }
-//
-//                                locVals.put(key0, v);
+                                cctx.addResult(locVals, key, v, skipVals, false, deserializePortable);
 
                                 return false;
                             }
@@ -490,19 +477,19 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
                 remote = !node.isLocal();
 
-                LinkedHashMap<K, Boolean> keys = mapped.get(node);
+                LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
 
                 if (keys != null && keys.containsKey(key)) {
                     if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
-                        onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT
-                            + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                        onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                            MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
                             U.toShortString(node) + ", mappings=" + mapped + ']'));
 
                         return false;
                     }
                 }
 
-                LinkedHashMap<K, Boolean> old = mappings.get(node);
+                LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
 
                 if (old == null)
                     mappings.put(node, old = new LinkedHashMap<>(3, 1f));
@@ -546,30 +533,13 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     private Map<K, V> createResultMap(Collection<GridCacheEntryInfo> infos) {
         int keysSize = infos.size();
 
-        try {
-            if (keysSize != 0) {
-                Map<K, V> map = new GridLeanMap<>(keysSize);
+        if (keysSize != 0) {
+            Map<K, V> map = new GridLeanMap<>(keysSize);
 
-                for (GridCacheEntryInfo info : infos) {
-                    info.unmarshalValue(cctx, cctx.deploy().globalLoader());
-
-                    K key = info.key().value(cctx);
-                    V val = info.value().value(cctx);
-
-                    if (cctx.portableEnabled()) {
-                        key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
-                        val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable);
-                    }
+            for (GridCacheEntryInfo info : infos)
+                cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable);
 
-                    map.put(key, val);
-                }
-
-                return map;
-            }
-        }
-        catch (IgniteCheckedException e) {
-            // Fail.
-            onDone(e);
+            return map;
         }
 
         return Collections.emptyMap();
@@ -596,7 +566,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
         /** Keys. */
         @GridToStringInclude
-        private LinkedHashMap<K, Boolean> keys;
+        private LinkedHashMap<KeyCacheObject, Boolean> keys;
 
         /** Topology version on which this future was mapped. */
         private long topVer;
@@ -613,7 +583,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
          * @param keys Keys.
          * @param topVer Topology version.
          */
-        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, long topVer) {
+        MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, long topVer) {
             super(cctx.kernalContext());
 
             this.node = node;
@@ -638,7 +608,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         /**
          * @return Keys.
          */
-        public Collection<K> keys() {
+        public Collection<KeyCacheObject> keys() {
             return keys.keySet();
         }
 
@@ -715,8 +685,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         long topVer = fut.get();
 
                         // This will append new futures to compound list.
-                        map(F.view(keys.keySet(),  new P1<K>() {
-                            @Override public boolean apply(K key) {
+                        map(F.view(keys.keySet(),  new P1<KeyCacheObject>() {
+                            @Override public boolean apply(KeyCacheObject key) {
                                 return invalidParts.contains(cctx.affinity().partition(key));
                             }
                         }), F.t(node, keys), topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6a54177..ec06a70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -414,18 +414,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException {
         return removexAsync(key, val).get();
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
         return replacexAsync(key, oldVal, newVal).get();
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
         A.notNull(key, "key", val, "val");
 
         if (ctx.portableEnabled())
@@ -436,7 +436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
         if (ctx.portableEnabled())
             oldVal = (V)ctx.marshalToPortable(oldVal);
 
@@ -917,7 +917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!reload && !forcePrimary) {
-            Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
+            Map<K, V> locVals = U.newHashMap(keys.size());
 
             boolean success = true;
 
@@ -960,16 +960,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                                 success = false;
                             }
-                            else {
-                                Object val = v.value(ctx);
-
-                                if (ctx.portableEnabled() && deserializePortable) {
-                                    key = (K)ctx.unwrapPortableIfNeeded(key, false);
-                                    val = ctx.unwrapPortableIfNeeded(val, false);
-                                }
-
-                                locVals.put(key, (V)val);
-                            }
+                            else
+                                ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable);
                         }
                         else
                             success = false;
@@ -1177,7 +1169,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (retVal == null)
                             retVal = new GridCacheReturn<>(null, true);
 
-                        res.returnValue(retVal);
+                        res.returnValue(req.operation() == TRANSFORM, retVal);
                     }
                     else
                         // Should remap all keys.
@@ -1292,8 +1284,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridCacheOperation op = req.operation();
 
-        Map<KeyCacheObject, EntryProcessorResult> invokeResMap =
-            op == TRANSFORM ? U.<KeyCacheObject, EntryProcessorResult>newHashMap(size) : null;
+        Collection<CacheInvokeDirectResult> invokeResults = op == TRANSFORM ? new ArrayList(size) : null;
 
         int firstEntryIdx = 0;
 
@@ -1358,7 +1349,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         oldVal);
 
                     CacheObject updated;
-                    CacheInvokeResult invokeRes = null;
+                    CacheInvokeDirectResult invokeRes = null;
 
                     try {
                         Object computed = entryProcessor.process(invokeEntry, req.invokeArguments());
@@ -1368,16 +1359,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         updated = ctx.toCacheObject(updatedVal);
 
                         if (computed != null)
-                            invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+                            invokeRes = new CacheInvokeDirectResult(entry.key(),
+                                ctx.toCacheObject(ctx.unwrapTemporary(computed)));
                     }
                     catch (Exception e) {
-                        invokeRes = new CacheInvokeResult<>(e);
+                        invokeRes = new CacheInvokeDirectResult(entry.key(), e);
 
                         updated = old;
                     }
 
                     if (invokeRes != null)
-                        invokeResMap.put(entry.key(), invokeRes);
+                        invokeResults.add(invokeRes);
 
                     if (updated == null) {
                         if (intercept) {
@@ -1570,7 +1562,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         updRes.dhtFuture(dhtFut);
 
-        updRes.invokeResult(invokeResMap);
+        updRes.invokeResult(invokeResults);
 
         return updRes;
     }
@@ -1668,7 +1660,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        Map<KeyCacheObject, EntryProcessorResult<?>> computedMap = null;
+        Collection<CacheInvokeDirectResult> computed = null;
 
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
@@ -1814,12 +1806,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (updRes.computedResult() != null) {
                         if (retVal == null) {
-                            computedMap = U.newHashMap(keys.size());
+                            computed = new ArrayList(keys.size());
 
-                            retVal = new GridCacheReturn<>((Object)computedMap, updRes.success(), true);
+                            retVal = new GridCacheReturn<>((Object)computed, updRes.success(), false);
                         }
 
-                        computedMap.put(k, updRes.computedResult());
+                        computed.add(updRes.computedResult());
                     }
                 }
                 else {
@@ -2710,7 +2702,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private boolean readersOnly;
 
         /** */
-        private Map<KeyCacheObject, EntryProcessorResult> invokeRes;
+        private Collection<CacheInvokeDirectResult> invokeRes;
 
         /**
          * @param entry Entry.
@@ -2745,14 +2737,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @param invokeRes Result for invoke operation.
          */
-        private void invokeResult(Map<KeyCacheObject, EntryProcessorResult> invokeRes) {
+        private void invokeResult(Collection<CacheInvokeDirectResult> invokeRes) {
             this.invokeRes = invokeRes;
         }
 
         /**
          * @return Result for invoke operation.
          */
-        Map<KeyCacheObject, EntryProcessorResult> invokeResults() {
+        Collection<CacheInvokeDirectResult> invokeResults() {
             return invokeRes;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d36a850..d34901f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -100,7 +100,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     private volatile CachePartialUpdateCheckedException err;
 
     /** Operation result. */
-    private volatile GridCacheReturn<Object> opRes;
+    private volatile GridCacheReturn<?> opRes;
 
     /** Return value require flag. */
     private final boolean retval;
@@ -321,11 +321,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
         assert res == null || res instanceof GridCacheReturn;
 
         GridCacheReturn ret = (GridCacheReturn)res;
 
+        if (op != TRANSFORM) {
+            CacheObject val = (CacheObject)ret.value();
+
+            ret.value(CU.value(val, cctx));
+        }
+
         Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success();
 
         if (op == TRANSFORM && retval == null)
@@ -367,9 +374,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
             if (res.error() != null)
                 onDone(addFailedKeys(res.failedKeys(), res.error()));
             else {
-                GridCacheReturn<Object> opRes0 = opRes = res.returnValue();
+                if (op == TRANSFORM) {
+                    if (res.returnValue() != null)
+                        addInvokeResults(res.returnValue());
 
-                onDone(opRes0);
+                    onDone(opRes);
+                }
+                else {
+                    GridCacheReturn<?> opRes0 = opRes = res.returnValue();
+
+                    onDone(opRes0);
+                }
             }
         }
         else {
@@ -869,18 +884,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     /**
      * @param ret Result from single node.
      */
-    private synchronized void addInvokeResults(GridCacheReturn<Object> ret) {
+    @SuppressWarnings("unchecked")
+    private synchronized void addInvokeResults(GridCacheReturn ret) {
         assert op == TRANSFORM : op;
-        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+        assert ret.value() == null || ret.value() instanceof Collection : ret.value();
 
         if (ret.value() != null) {
+            Collection<CacheInvokeDirectResult> results =
+                (Collection<CacheInvokeDirectResult>)ret.value();
+
+            Map<Object, CacheInvokeResult> map0 = U.newHashMap(results.size());
+
+            for (CacheInvokeDirectResult res : results) {
+                CacheInvokeResult<?> res0 = res.error() == null ?
+                    new CacheInvokeResult<>(CU.value(res.result(), cctx)) : new CacheInvokeResult<>(res.error());
+
+                map0.put(res.key().value(cctx), res0);
+            }
+
             if (opRes != null) {
-                Map<Object, Object> map = (Map<Object, Object>)opRes.value();
+                Map<Object, CacheInvokeResult> oldMap = (Map<Object, CacheInvokeResult>)opRes.value();
+
+                assert oldMap != null;
 
-                map.putAll((Map<Object, Object>)ret.value());
+                oldMap.putAll(map0);
             }
-            else
+            else {
+                ret.value(map0);
+
                 opRes = ret;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 1d6e896..83ceecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -56,12 +56,16 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Serialized error. */
     private byte[] errBytes;
 
-    /** Return value. */
-    @GridDirectTransient
-    private GridCacheReturn<Object> retVal;
+    /** */
+    private boolean success;
 
-    /** Serialized return value. */
-    private byte[] retValBytes;
+    /** */
+    @GridToStringInclude
+    private CacheObject retVal;
+
+    /** */
+    @GridDirectCollection(CacheInvokeDirectResult.class)
+    private Collection<CacheInvokeDirectResult> invokeRes;
 
     /** Failed keys. */
     @GridToStringInclude
@@ -156,15 +160,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /**
      * @return Return value.
      */
-    public GridCacheReturn<Object> returnValue() {
-        return retVal;
+    public GridCacheReturn<?> returnValue() {
+        return invokeRes != null ? new GridCacheReturn<>(invokeRes, success) : new GridCacheReturn<>(retVal, success);
     }
 
     /**
+     * @param invoke {@code True} if result for {@code invoke} operation.
      * @param retVal Return value.
      */
-    public void returnValue(GridCacheReturn<Object> retVal) {
-        this.retVal = retVal;
+    @SuppressWarnings("unchecked")
+    public void returnValue(boolean invoke, GridCacheReturn<Object> retVal) {
+        success = retVal.success();
+
+        if (invoke)
+            invokeRes = (Collection<CacheInvokeDirectResult>)retVal.value();
+        else
+            this.retVal = (CacheObject)retVal.value();
     }
 
     /**
@@ -376,14 +387,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         if (err != null)
             errBytes = ctx.marshaller().marshal(err);
 
-        if (retVal != null)
-            retValBytes = ctx.marshaller().marshal(retVal);
-
         prepareMarshalCacheObjects(failedKeys, ctx);
 
         prepareMarshalCacheObjects(remapKeys, ctx);
 
         prepareMarshalCacheObjects(nearVals, ctx);
+
+        if (retVal != null)
+            retVal.prepareMarshal(ctx);
+
+        if (invokeRes != null) {
+            for (CacheInvokeDirectResult res : invokeRes)
+                res.prepareMarshal(ctx);
+        }
     }
 
     /** {@inheritDoc} */
@@ -393,14 +409,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         if (errBytes != null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
 
-        if (retValBytes != null)
-            retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
-
         finishUnmarshalCacheObjects(failedKeys, ctx, ldr);
 
         finishUnmarshalCacheObjects(remapKeys, ctx, ldr);
 
         finishUnmarshalCacheObjects(nearVals, ctx, ldr);
+
+        if (retVal != null)
+            retVal.finishUnmarshal(ctx, ldr);
+
+        if (invokeRes != null) {
+            for (CacheInvokeDirectResult res : invokeRes)
+                res.finishUnmarshal(ctx, ldr);
+        }
     }
 
     /** {@inheritDoc} */
@@ -437,49 +458,61 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                if (!writer.writeCollection("invokeRes", invokeRes, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("nearTtls", nearTtls))
+                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage("nearVer", nearVer))
+                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearVer", nearVer))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray("retValBytes", retValBytes))
+                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeBoolean("success", success))
                     return false;
 
                 writer.incrementState();
@@ -525,7 +558,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 6:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
+                invokeRes = reader.readCollection("invokeRes", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -533,7 +566,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 7:
-                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+                nearExpireTimes = reader.readMessage("nearExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -541,7 +574,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 8:
-                nearTtls = reader.readMessage("nearTtls");
+                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -549,7 +582,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 9:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+                nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -557,7 +590,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 10:
-                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -565,7 +598,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 11:
-                nearVer = reader.readMessage("nearVer");
+                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -573,7 +606,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 12:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+                nearVer = reader.readMessage("nearVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -581,7 +614,23 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 13:
-                retValBytes = reader.readByteArray("retValBytes");
+                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                success = reader.readBoolean("success");
 
                 if (!reader.isLastRead())
                     return false;
@@ -600,7 +649,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 3c09c56..15befd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -299,14 +299,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                                 success = false;
                             }
-                            else {
-                                Object val = v.value(ctx);
-
-                                if (ctx.portableEnabled() && !skipVals)
-                                    val = ctx.unwrapPortableIfNeeded(val, !deserializePortable);
-
-                                locVals.put(key, (V)CU.skipValue(val, skipVals));
-                            }
+                            else
+                                ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable);
                         }
                         else
                             success = false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 9af2767..a30f372 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -482,24 +482,24 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException {
         return dht.removex(key, val);
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
         return dht.replacex(key, oldVal, newVal);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
         return dht.removexAsync(key, val);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
         return dht.replacexAsync(key, oldVal, newVal);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 20e5b87..93c6167 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -335,7 +335,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         topVer,
                         subjId,
                         taskName == null ? 0 : taskName.hashCode(),
-                        deserializePortable,
                         expiryPlc,
                         skipVals);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index cb939f4..4561db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -51,7 +51,15 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
     /** */
     @GridToStringInclude
     @GridDirectTransient
-    private LinkedHashMap<KeyCacheObject, Boolean> keys;
+    private LinkedHashMap<KeyCacheObject, Boolean> keyMap;
+
+    /** */
+    @GridDirectCollection(KeyCacheObject.class)
+    private Collection<KeyCacheObject> keys;
+
+    /** */
+    @GridDirectCollection(boolean.class)
+    private Collection<Boolean> flags;
 
     /** Reload flag. */
     private boolean reload;
@@ -62,11 +70,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
     /** Skip values flag. Used for {@code containsKey} method. */
     private boolean skipVals;
 
-    /** */
-    @GridToStringExclude
-    @GridDirectMap(keyType = byte[].class, valueType = boolean.class)
-    private LinkedHashMap<byte[], Boolean> keyBytes;
-
     /** Topology version. */
     private long topVer;
 
@@ -124,7 +127,8 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
         this.futId = futId;
         this.miniId = miniId;
         this.ver = ver;
-        this.keys = keys;
+        this.keys = keys.keySet();
+        this.flags = keys.values();
         this.readThrough = readThrough;
         this.reload = reload;
         this.topVer = topVer;
@@ -173,7 +177,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
      * @return Keys
      */
     public LinkedHashMap<KeyCacheObject, Boolean> keys() {
-        return keys;
+        return keyMap;
     }
 
     /**
@@ -221,9 +225,9 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
 
         assert ctx != null;
         assert !F.isEmpty(keys);
+        assert keys.size() == flags.size();
 
-        if (keyBytes == null)
-            keyBytes = marshalBooleanLinkedMap(keys, ctx);
+        prepareMarshalCacheObjects(keys, ctx);
     }
 
     /**
@@ -234,8 +238,20 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (keys == null)
-            keys = unmarshalBooleanLinkedMap(keyBytes, ctx, ldr);
+        finishUnmarshalCacheObjects(keys, ctx, ldr);
+
+        assert !F.isEmpty(keys);
+        assert keys.size() == flags.size();
+
+        if (keyMap == null) {
+            keyMap = U.newLinkedHashMap(keys.size());
+
+            Iterator<KeyCacheObject> keysIt = keys.iterator();
+            Iterator<Boolean> flagsIt = flags.iterator();
+
+            while (keysIt.hasNext())
+                keyMap.put(keysIt.next(), flagsIt.next());
+        }
     }
 
     /** {@inheritDoc} */
@@ -260,60 +276,66 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMap("keyBytes", keyBytes, MessageCollectionItemType.BYTE_ARR, MessageCollectionItemType.BOOLEAN))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeBoolean("readThrough", readThrough))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeBoolean("reload", reload))
+                if (!writer.writeBoolean("readThrough", readThrough))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("skipVals", skipVals))
+                if (!writer.writeBoolean("reload", reload))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipVals", skipVals))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeLong("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 13:
+                if (!writer.writeLong("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
                 if (!writer.writeMessage("ver", ver))
                     return false;
 
@@ -344,7 +366,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 4:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN);
 
                 if (!reader.isLastRead())
                     return false;
@@ -352,7 +374,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 5:
-                keyBytes = reader.readMap("keyBytes", MessageCollectionItemType.BYTE_ARR, MessageCollectionItemType.BOOLEAN, true);
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -360,7 +382,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 6:
-                miniId = reader.readIgniteUuid("miniId");
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -368,7 +390,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 7:
-                readThrough = reader.readBoolean("readThrough");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -376,7 +398,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 8:
-                reload = reader.readBoolean("reload");
+                readThrough = reader.readBoolean("readThrough");
 
                 if (!reader.isLastRead())
                     return false;
@@ -384,7 +406,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 9:
-                skipVals = reader.readBoolean("skipVals");
+                reload = reader.readBoolean("reload");
 
                 if (!reader.isLastRead())
                     return false;
@@ -392,7 +414,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 10:
-                subjId = reader.readUuid("subjId");
+                skipVals = reader.readBoolean("skipVals");
 
                 if (!reader.isLastRead())
                     return false;
@@ -400,7 +422,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 11:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -408,7 +430,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 12:
-                topVer = reader.readLong("topVer");
+                taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -416,6 +438,14 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
                 reader.incrementState();
 
             case 13:
+                topVer = reader.readLong("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
                 ver = reader.readMessage("ver");
 
                 if (!reader.isLastRead())
@@ -436,7 +466,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index afb0ac8..560e1c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -50,12 +50,9 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
 
     /** Result. */
     @GridToStringInclude
-    @GridDirectTransient
+    @GridDirectCollection(GridCacheEntryInfo.class)
     private Collection<GridCacheEntryInfo> entries;
 
-    /** */
-    private byte[] entriesBytes;
-
     /** Keys to retry due to ownership shift. */
     @GridToStringInclude
     @GridDirectCollection(int.class)
@@ -176,9 +173,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
         super.prepareMarshal(ctx);
 
         if (entries != null) {
-            marshalInfos(entries, ctx);
-
-            entriesBytes = ctx.marshaller().marshal(entries);
+            for (GridCacheEntryInfo info : entries)
+                info.marshal(ctx);
         }
 
         if (err != null)
@@ -189,10 +185,11 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (entriesBytes != null) {
-            entries = ctx.marshaller().unmarshal(entriesBytes, ldr);
+        GridCacheContext cctx = ctx.cacheContext(cacheId());
 
-            unmarshalInfos(entries, ctx.cacheContext(cacheId()), ldr);
+        if (entries != null) {
+            for (GridCacheEntryInfo info : entries)
+                info.unmarshal(cctx, ldr);
         }
 
         if (errBytes != null)
@@ -215,7 +212,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeByteArray("entriesBytes", entriesBytes))
+                if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -273,7 +270,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
 
         switch (reader.state()) {
             case 3:
-                entriesBytes = reader.readByteArray("entriesBytes");
+                entries = reader.readCollection("entries", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 85bf527..8433d11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -267,7 +267,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
         A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
 
         ctx.denyOnLocalRead();
@@ -275,7 +275,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         if (ctx.portableEnabled())
             oldVal = (V)ctx.marshalToPortable(oldVal);
 
-        return (GridCacheReturn<CacheObject>)updateAllInternal(UPDATE,
+        return (GridCacheReturn<V>)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(newVal),
             null,
@@ -288,7 +288,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException {
+    @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException {
         A.notNull(key, "key", val, "val");
 
         ctx.denyOnLocalRead();
@@ -296,7 +296,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         if (ctx.portableEnabled())
             val = (V)ctx.marshalToPortable(val);
 
-        return (GridCacheReturn<CacheObject>)updateAllInternal(DELETE,
+        return (GridCacheReturn<V>)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
             null,
@@ -309,7 +309,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
         A.notNull(key, "key", val, "val");
 
         ctx.denyOnLocalRead();
@@ -322,7 +322,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) {
+    @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
         A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
 
         ctx.denyOnLocalRead();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
index a460d4d..ac688a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
@@ -28,6 +28,8 @@ import java.nio.*;
 
 /**
  * Portable processor.
+ *
+ * TODO IGNITE-51: rename.
  */
 public interface GridPortableProcessor extends GridProcessor {
     /** {@inheritDoc} */
@@ -86,10 +88,13 @@ public interface GridPortableProcessor extends GridProcessor {
     public Object marshalToPortable(@Nullable Object obj) throws IgniteException;
 
     /**
+     * TODO IGNITE-51: rename.
+     *
      * @param obj Object (portable or not).
+     * @param cctx Cache context.
      * @return Detached portable object or original object.
      */
-    public Object detachPortable(@Nullable Object obj);
+    public Object detachPortable(@Nullable Object obj, GridCacheContext cctx);
 
     /**
      * @return Portable marshaller for client connectivity or {@code null} if it's not

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
index 5f0b3bf..ce81f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
@@ -75,8 +75,13 @@ public class GridOsPortableProcessor extends GridProcessorAdapter implements Gri
     }
 
     /** {@inheritDoc} */
-    @Override public Object detachPortable(@Nullable Object obj) {
-        return obj;
+    @Override public Object detachPortable(@Nullable Object obj, GridCacheContext cctx) {
+        if (obj == null)
+            return obj;
+
+        assert obj instanceof CacheObject : obj;
+
+        return ((CacheObject)obj).prepareForCache(cctx);
     }
 
     /** {@inheritDoc} */
@@ -121,11 +126,11 @@ public class GridOsPortableProcessor extends GridProcessorAdapter implements Gri
 
     /** {@inheritDoc} */
     @Nullable @Override public KeyCacheObject toCacheKeyObject(@Nullable Object obj) {
-        return new KeyCacheObjectImpl(obj);
+        return new UserKeyCacheObjectImpl(obj);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public CacheObject toCacheObject(@Nullable Object obj) {
-        return new CacheObjectImpl(obj);
+        return new UserCacheObjectImpl(obj);
     }
 }


Mime
View raw message