ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/27] incubator-ignite git commit: # ignite-51
Date Thu, 26 Feb 2015 04:09:42 GMT
# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6b41833
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6b41833
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6b41833

Branch: refs/heads/ignite-51
Commit: a6b418331800a4d117c97016f263b5270e007fa1
Parents: ffee48d
Author: sboikov <semen.boikov@inria.fr>
Authored: Wed Feb 25 22:13:44 2015 +0300
Committer: sboikov <semen.boikov@inria.fr>
Committed: Thu Feb 26 07:01:41 2015 +0300

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java |  28 +--
 .../direct/DirectMessageWriterState.java        |   3 +
 .../communication/GridIoMessageFactory.java     |  20 ++
 .../affinity/GridAffinityAssignmentCache.java   |   1 +
 .../cache/CacheEntryInfoCollection.java         | 108 +++++++++++
 .../internal/processors/cache/CacheObject.java  |   8 +-
 .../processors/cache/CacheObjectImpl.java       | 157 ++++++++++++++-
 .../processors/cache/GridCacheAdapter.java      |  15 +-
 .../processors/cache/GridCacheContext.java      |  11 ++
 .../processors/cache/GridCacheEntryInfo.java    | 192 ++++++++++++++++---
 .../processors/cache/GridCacheMessage.java      |  70 +++++++
 .../processors/cache/KeyCacheObjectImpl.java    | 127 +++++++++++-
 .../dht/atomic/GridDhtAtomicCache.java          |  37 +---
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   8 -
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 176 ++++-------------
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  50 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   7 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 182 ++++++++++--------
 .../atomic/GridNearAtomicUpdateResponse.java    |  68 ++-----
 .../preloader/GridDhtPartitionDemandPool.java   |   4 +-
 .../GridDhtPartitionSupplyMessage.java          | 103 +++++-----
 .../distributed/near/GridNearAtomicCache.java   |  17 +-
 .../portable/os/GridOsPortableProcessor.java    |   4 +-
 23 files changed, 931 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index 4ad78f3..55b02e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -240,9 +240,15 @@ public class DirectByteBufferStream {
     private Message msg;
 
     /** */
+    private Iterator<?> mapIt;
+
+    /** */
     private Iterator<?> it;
 
     /** */
+    private Object mapCur = NULL;
+
+    /** */
     private Object cur = NULL;
 
     /** */
@@ -634,22 +640,22 @@ public class DirectByteBufferStream {
     public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
         MessageWriter writer) {
         if (map != null) {
-            if (it == null) {
+            if (mapIt == null) {
                 writeInt(map.size());
 
                 if (!lastFinished)
                     return;
 
-                it = map.entrySet().iterator();
+                mapIt = map.entrySet().iterator();
             }
 
-            while (it.hasNext() || cur != NULL) {
+            while (mapIt.hasNext() || mapCur != NULL) {
                 Map.Entry<K, V> e;
 
-                if (cur == NULL)
-                    cur = it.next();
+                if (mapCur == NULL)
+                    mapCur = mapIt.next();
 
-                e = (Map.Entry<K, V>)cur;
+                e = (Map.Entry<K, V>)mapCur;
 
                 if (!keyDone) {
                     write(keyType, e.getKey(), writer);
@@ -665,11 +671,11 @@ public class DirectByteBufferStream {
                 if (!lastFinished)
                     return;
 
-                cur = NULL;
+                mapCur = NULL;
                 keyDone = false;
             }
 
-            it = null;
+            mapIt = null;
         }
         else
             writeInt(-1);
@@ -1054,7 +1060,7 @@ public class DirectByteBufferStream {
                     if (!lastFinished)
                         return null;
 
-                    cur = key;
+                    mapCur = key;
                     keyDone = true;
                 }
 
@@ -1063,7 +1069,7 @@ public class DirectByteBufferStream {
                 if (!lastFinished)
                     return null;
 
-                map.put(cur, val);
+                map.put(mapCur, val);
 
                 keyDone = false;
 
@@ -1073,7 +1079,7 @@ public class DirectByteBufferStream {
 
         readSize = -1;
         readItems = 0;
-        cur = null;
+        mapCur = null;
 
         M map0 = (M)map;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
index 3c473d2..f9ab9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
@@ -35,6 +35,9 @@ public class DirectMessageWriterState {
     /** Current position. */
     private int pos;
 
+    /**
+     *
+     */
     public DirectMessageWriterState() {
         stack = new int[INIT_SIZE];
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..aad555a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -492,6 +492,26 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 89:
+                msg = new CacheObjectImpl();
+
+                break;
+
+            case 90:
+                msg = new KeyCacheObjectImpl();
+
+                break;
+
+            case 91:
+                msg = new GridCacheEntryInfo();
+
+                break;
+
+            case 92:
+                msg = new CacheEntryInfoCollection();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 9c12a17..0247e67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -296,6 +296,7 @@ public class GridAffinityAssignmentCache {
      * @return Partition.
      */
     public int partition(Object key) {
+        // TODO IGNITE-51.
         if (ctx.portableEnabled()) {
             try {
                 key = ctx.marshalToPortable(key);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
new file mode 100644
index 0000000..024d251
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class CacheEntryInfoCollection implements Message {
+    /** */
+    @GridDirectCollection(GridCacheEntryInfo.class)
+    private List<GridCacheEntryInfo> infos;
+
+    /**
+     *
+     */
+    public void init() {
+        infos = new ArrayList<>();
+    }
+
+    /**
+     * @return Entries.
+     */
+    public List<GridCacheEntryInfo> infos() {
+        return infos;
+    }
+
+    /**
+     * @param info Entry.
+     */
+    public void add(GridCacheEntryInfo info) {
+        infos.add(info);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("infos", infos, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                infos = reader.readCollection("infos", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 92;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index 6991d74..cda9e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
 /**
  *
  */
-public interface CacheObject {
+public interface CacheObject extends Message {
     /**
      * @param ctx Context.
      * @return Value.
@@ -34,4 +36,8 @@ public interface CacheObject {
      * @return Field value.
      */
     @Nullable public <T> T getField(String name);
+
+    public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException;
+
+    public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index b0ffeda..9e24b43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -17,30 +17,179 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
+import java.io.*;
+import java.nio.*;
+
 /**
  *
  */
-public class CacheObjectImpl implements CacheObject {
+public class CacheObjectImpl implements CacheObject, Externalizable {
     /** */
+    @GridToStringInclude
+    @GridDirectTransient
     private Object val;
 
+    /** */
+    private byte[] valBytes;
+
+    /**
+     *
+     */
+    public CacheObjectImpl() {
+        // No-op.
+    }
+
     /**
      * @param val Value.
      */
-    CacheObjectImpl(Object val) {
-        this.val = val;
+    public CacheObjectImpl(Object val) {
+        assert val != null;
+
+        if (val instanceof byte[])
+            valBytes = (byte[])val;
+        else
+            this.val = val;
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public <T> T getField(String name) {
+        // TODO IGNITE-51.
         return null;
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public <T> T value(GridCacheContext ctx) {
-        return (T)val;
+        if (val != null)
+            return (T)val;
+        else
+            return (T)valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        if (valBytes == null)
+            valBytes = CU.marshal(ctx, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        assert valBytes != null;
+
+        boolean byteArr = val != null;
+
+        if (byteArr)
+            val = null;
+        else
+            val = ctx.marshaller().unmarshal(valBytes, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        assert valBytes != null;
+
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("valBytes", valBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                boolean byteArr = val == null;
+
+                if (!writer.writeBoolean("byteArr", byteArr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                valBytes = reader.readByteArray("valBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                boolean byteArr = reader.readBoolean("byteArr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                if (byteArr)
+                    val = valBytes;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 89;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        assert false;
+
+        return super.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        assert false;
+
+        return super.equals(obj);
+    }
+
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert false;
+    }
+
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        assert false;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(CacheObjectImpl.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2e2904b..bfffaae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5226,13 +5226,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         boolean deserializePortable) {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
-        if (ctx.portableEnabled() && !F.isEmpty(keys)) {
-            keys = F.viewReadOnly(keys, new C1<K, K>() {
-                @Override public K apply(K k) {
-                    return (K)ctx.marshalToPortable(k);
-                }
-            });
-        }
+// TODO IGNITE-51.
+//        if (ctx.portableEnabled() && !F.isEmpty(keys)) {
+//            keys = F.viewReadOnly(keys, new C1<K, K>() {
+//                @Override public K apply(K k) {
+//                    return (K)ctx.marshalToPortable(k);
+//                }
+//            });
+//        }
 
         return getAllAsync(keys,
             !ctx.config().isReadFromBackup(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 63929d8..64fe93f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1767,6 +1767,9 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Cache object.
      */
     @Nullable public CacheObject toCacheObject(@Nullable Object obj) {
+        if (obj instanceof CacheObject)
+            return (CacheObject)obj;
+
         return portable().toCacheObject(obj);
     }
 
@@ -1775,9 +1778,17 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Cache object.
      */
     @Nullable public KeyCacheObject toCacheKeyObject(@Nullable Object obj) {
+        if (obj instanceof KeyCacheObject)
+            return (KeyCacheObject)obj;
+
         return portable().toCacheKeyObject(obj);
     }
 
+    // TODO IGNITE-51.
+    public boolean copyOnGet() {
+        return false;
+    }
+
     /**
      * Nulling references to potentially leak-prone objects.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index d27f32f..12fac3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -20,44 +20,30 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
 import java.io.*;
+import java.nio.*;
 
 /**
  * Entry information that gets passed over wire.
  */
-public class GridCacheEntryInfo implements Externalizable {
+public class GridCacheEntryInfo implements Externalizable, Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Cache key. */
     @GridToStringInclude
-    @GridDirectTransient
     private KeyCacheObject key;
 
     /** Cache ID. */
     private int cacheId;
 
-    /** Key bytes. */
-    private byte[] keyBytes;
-
-    /** Key bytes sent. */
-    private boolean keyBytesSent;
-
     /** Cache value. */
-    @GridDirectTransient
     private CacheObject val;
 
-    /** Value bytes. */
-    private byte[] valBytes;
-
-    /** Value bytes sent. */
-    private boolean valBytesSent;
-
     /** Time to live. */
     private long ttl;
 
@@ -68,10 +54,12 @@ public class GridCacheEntryInfo implements Externalizable {
     private GridCacheVersion ver;
 
     /** New flag. */
+    @GridDirectTransient
     private boolean isNew;
 
     /** Deleted flag. */
-    private transient boolean deleted;
+    @GridDirectTransient
+    private boolean deleted;
 
     /**
      * @return Cache ID.
@@ -185,14 +173,149 @@ public class GridCacheEntryInfo implements Externalizable {
         this.deleted = deleted;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        // TODO IGNITE-51: field 'remaining'.
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cacheId", cacheId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("expireTime", expireTime))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("ttl", ttl))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("ver", ver))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        // TODO IGNITE-51: field 'remaining'.
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheId = reader.readInt("cacheId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                expireTime = reader.readLong("expireTime");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                ttl = reader.readLong("ttl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                val = reader.readMessage("val");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                ver = reader.readMessage("ver");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 91;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
     /**
      * @param ctx Context.
      * @param ldr Loader.
      * @throws IgniteCheckedException If failed.
      */
     public void unmarshalValue(GridCacheContext<?, ?> ctx, ClassLoader ldr) throws IgniteCheckedException {
-        if (val == null && valBytes != null)
-            val = ctx.marshaller().unmarshal(valBytes, ldr);
+// TODO IGNITE-51
+//        if (val == null && valBytes != null)
+//            val = ctx.marshaller().unmarshal(valBytes, ldr);
+    }
+
+    /**
+     * @return Marshalled size.
+     */
+    public int marshalledSize() {
+        // TODO IGNITE-51.
+        return 0;
     }
 
     /**
@@ -200,6 +323,9 @@ public class GridCacheEntryInfo implements Externalizable {
      * @throws IgniteCheckedException In case of error.
      */
     public void marshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
+        key.prepareMarshal(ctx);
+
+        val.prepareMarshal(ctx);
 // TODO IGNITE-51
 //        boolean depEnabled = ctx.gridDeploy().enabled();
 //
@@ -224,13 +350,17 @@ public class GridCacheEntryInfo implements Externalizable {
      * @throws IgniteCheckedException If unmarshalling failed.
      */
     public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException {
-        Marshaller mrsh = ctx.marshaller();
-
-        if (key == null)
-            key = mrsh.unmarshal(keyBytes, clsLdr);
+        key.finishUnmarshal(ctx.shared(), clsLdr);
 
-        if (ctx.isUnmarshalValues() && val == null && valBytes != null)
-            val = mrsh.unmarshal(valBytes, clsLdr);
+        val.finishUnmarshal(ctx.shared(), clsLdr);
+// TODO IGNITE-51
+//        Marshaller mrsh = ctx.marshaller();
+//
+//        if (key == null)
+//            key = mrsh.unmarshal(keyBytes, clsLdr);
+//
+//        if (ctx.isUnmarshalValues() && val == null && valBytes != null)
+//            val = mrsh.unmarshal(valBytes, clsLdr);
     }
 
     /** {@inheritDoc} */
@@ -312,9 +442,11 @@ public class GridCacheEntryInfo implements Externalizable {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridCacheEntryInfo.class, this,
-            "isNull", val == null,
-            "keyBytesSize", (keyBytes == null ? "null" : Integer.toString(keyBytes.length)),
-            "valBytesSize", (valBytes == null ? "null" : Integer.toString(valBytes.length)));
+        return S.toString(GridCacheEntryInfo.class, this);
+
+//        return S.toString(GridCacheEntryInfo.class, this,
+//            "isNull", val == null,
+//            "keyBytesSize", (keyBytes == null ? "null" : Integer.toString(keyBytes.length)),
+//            "valBytesSize", (valBytes == null ? "null" : Integer.toString(valBytes.length)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bb0468c..ad773ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -575,6 +575,76 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /**
+     * @param col Collection.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+        GridCacheSharedContext ctx) throws IgniteCheckedException {
+        if (col == null)
+            return;
+
+        int size = col.size();
+
+        for (int i = 0 ; i < size; i++)
+            col.get(i).prepareMarshal(ctx);
+    }
+
+    /**
+     * @param col Collection.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected final void prepareMarshalCacheObjects(@Nullable Collection<? extends CacheObject> col,
+        GridCacheSharedContext ctx) throws IgniteCheckedException {
+        if (col == null)
+            return;
+
+        for (CacheObject obj : col)
+            obj.prepareMarshal(ctx);
+    }
+
+    /**
+     * @param col Collection.
+     * @param ctx Context.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+        GridCacheSharedContext ctx,
+        ClassLoader ldr)
+        throws IgniteCheckedException
+    {
+        if (col == null)
+            return;
+
+        int size = col.size();
+
+        for (int i = 0 ; i < size; i++)
+            col.get(i).finishUnmarshal(ctx, ldr);
+    }
+
+    /**
+     * @param col Collection.
+     * @param ctx Context.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected final void finishUnmarshalCacheObjects(@Nullable Collection<? extends CacheObject> col,
+        GridCacheSharedContext ctx,
+        ClassLoader ldr)
+        throws IgniteCheckedException
+    {
+        if (col == null)
+            return;
+
+        for (CacheObject obj : col)
+            obj.finishUnmarshal(ctx, ldr);
+    }
+
+    /**
      * @param byteCol Collection to unmarshal.
      * @param ctx Context.
      * @param ldr Loader.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 47622ff..113c6d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -17,34 +17,155 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
+import java.io.*;
+import java.nio.*;
+
 /**
  *
  */
-public class KeyCacheObjectImpl implements KeyCacheObject {
+public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable {
     /** */
+    @GridToStringInclude
+    @GridDirectTransient
     private Object val;
 
+    /** */
+    private byte[] valBytes;
+
+    /**
+     *
+     */
+    public KeyCacheObjectImpl() {
+        // No-op.
+    }
+
     /**
      * @param val Value.
      */
     public KeyCacheObjectImpl(Object val) {
+        assert val != null;
+
         this.val = val;
     }
 
     /** {@inheritDoc} */
     @Override public boolean internal() {
-        return false;
+        return val instanceof GridCacheInternal;
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Nullable @Override public <T> T value(GridCacheContext ctx) {
-        return null;
+        return (T)val;
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public <T> T getField(String name) {
+        // TODO IGNITE-51.
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        assert val != null;
+
+        return val.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("valBytes", valBytes))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                valBytes = reader.readByteArray("valBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 90;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        if (valBytes == null)
+            valBytes = CU.marshal(ctx, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        assert valBytes != null;
+
+        val = ctx.marshaller().unmarshal(valBytes, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (!(obj instanceof KeyCacheObjectImpl))
+            return false;
+
+        KeyCacheObjectImpl other = (KeyCacheObjectImpl)obj;
+
+        return val.equals(other.val);
+    }
+
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert false;
+    }
+
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        assert false;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(KeyCacheObjectImpl.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0003f6d..6a54177 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1693,9 +1693,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
                     req.topologyVersion());
 
-                byte[] newValBytes = req.valueBytes(i);
-
-                Object writeVal = req.writeValue(i);
+                Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
                 Collection<UUID> readers = null;
                 Collection<UUID> filteredReaders = null;
@@ -1711,7 +1709,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     locNodeId,
                     op,
                     writeVal,
-                    newValBytes,
+                    null,
                     req.invokeArguments(),
                     primary && writeThrough(),
                     req.returnValue(),
@@ -1742,10 +1740,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (conflictCtx == null)
                             newConflictVer = null;
-                        else if (conflictCtx.isMerge()) {
+                        else if (conflictCtx.isMerge())
                             newConflictVer = null; // Conflict version is discarded in case of merge.
-                            newValBytes = null; // Value has been changed.
-                        }
 
                         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
@@ -1755,7 +1751,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
-                                newValBytes,
                                 entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
@@ -1766,7 +1761,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
-                                newValBytes,
                                 entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
@@ -1783,21 +1777,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
                             GridCacheVersionConflictContext ctx = updRes.conflictResolveResult();
 
-                            if (ctx != null && ctx.isMerge())
-                                newValBytes = null;
-
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
                                 res.addNearValue(i,
                                     updRes.newValue(),
-                                    newValBytes,
                                     updRes.newTtl(),
                                     updRes.conflictExpireTime());
                             }
                             else
                                 res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
 
-                            if (updRes.newValue() != null || newValBytes != null) {
+                            if (updRes.newValue() != null) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
 
                                 assert f == null : f;
@@ -2024,18 +2014,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        GridCacheValueBytes valBytesTuple = op == DELETE ? GridCacheValueBytes.nil():
-                            entry.valueBytes();
-
-                        byte[] valBytes = valBytesTuple.getIfMarshaled();
-
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
                         if (!batchRes.readersOnly())
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
-                                valBytes,
                                 entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
@@ -2045,7 +2029,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 writeVal,
-                                valBytes,
                                 entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
@@ -2057,13 +2040,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 int idx = firstEntryIdx + i;
 
                                 if (req.operation() == TRANSFORM) {
-                                    GridCacheValueBytes valBytesTuple = entry.valueBytes();
-
-                                    byte[] valBytes = valBytesTuple.getIfMarshaled();
-
                                     res.addNearValue(idx,
                                         writeVal,
-                                        valBytes,
                                         updRes.newTtl(),
                                         CU.EXPIRE_TIME_CALCULATE);
                                 }
@@ -2477,13 +2455,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         entry = entryExx(key);
 
                         CacheObject val = req.value(i);
-                        byte[] valBytes = req.valueBytes(i);
                         EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
 
                         GridCacheOperation op = entryProcessor != null ? TRANSFORM :
-                            (val != null || valBytes != null) ?
-                                UPDATE :
-                                DELETE;
+                            (val != null) ? UPDATE : DELETE;
 
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
@@ -2494,7 +2469,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             nodeId,
                             op,
                             op == TRANSFORM ? entryProcessor : val,
-                            valBytes,
+                            null,
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*retval*/false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index c5cb216..47262ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -207,7 +207,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
     /**
      * @param entry Entry to map.
      * @param val Value to write.
-     * @param valBytes Value bytes.
      * @param entryProcessor Entry processor.
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
@@ -215,7 +214,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
@@ -254,9 +252,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
                 }
 
                 updateReq.addWriteValue(entry.key(),
-                    entry.keyBytes(),
                     val,
-                    valBytes,
                     entryProcessor,
                     ttl,
                     conflictExpireTime,
@@ -269,7 +265,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param entryProcessor Entry processor..
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
@@ -277,7 +272,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
     public void addNearWriteEntries(Iterable<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
@@ -318,9 +312,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
             nearReadersEntries.put(entry.key(), entry);
 
             updateReq.addNearWriteValue(entry.key(),
-                entry.keyBytes(),
                 val,
-                valBytes,
                 entryProcessor,
                 ttl,
                 expireTime);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index a6d19ff..b1056c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -57,24 +57,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** Keys to update. */
     @GridToStringInclude
-    @GridDirectTransient
+    @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> keys;
 
-    /** Key bytes. */
-    @GridToStringInclude
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> keyBytes;
-
     /** Values to update. */
     @GridToStringInclude
-    @GridDirectTransient
+    @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
-    /** Value bytes. */
-    @GridToStringInclude
-    @GridDirectCollection(GridCacheValueBytes.class)
-    private List<GridCacheValueBytes> valBytes;
-
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -94,26 +84,16 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Write synchronization mode. */
     private CacheWriteSynchronizationMode syncMode;
 
-    /** Keys to update. */
+    /** Near cache keys to update. */
     @GridToStringInclude
-    @GridDirectTransient
+    @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> nearKeys;
 
-    /** Key bytes. */
-    @GridToStringInclude
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> nearKeyBytes;
-
     /** Values to update. */
     @GridToStringInclude
-    @GridDirectTransient
+    @GridDirectCollection(CacheObject.class)
     private List<CacheObject> nearVals;
 
-    /** Value bytes. */
-    @GridToStringInclude
-    @GridDirectCollection(GridCacheValueBytes.class)
-    private List<GridCacheValueBytes> nearValBytes;
-
     /** Force transform backups flag. */
     private boolean forceTransformBackups;
 
@@ -193,16 +173,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.invokeArgs = invokeArgs;
 
         keys = new ArrayList<>();
-        keyBytes = new ArrayList<>();
 
         if (forceTransformBackups) {
             entryProcessors = new ArrayList<>();
             entryProcessorsBytes = new ArrayList<>();
         }
-        else {
+        else
             vals = new ArrayList<>();
-            valBytes = new ArrayList<>();
-        }
     }
 
     /**
@@ -214,34 +191,27 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param key Key to add.
-     * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
-     * @param valBytes Value bytes, {@code null} if should be removed.
      * @param entryProcessor Entry processor.
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
      */
     public void addWriteValue(KeyCacheObject key,
-        @Nullable byte[] keyBytes,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer) {
         keys.add(key);
-        this.keyBytes.add(keyBytes);
 
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
             entryProcessors.add(entryProcessor);
         }
-        else {
+        else
             vals.add(val);
-            this.valBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
-        }
 
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
@@ -284,47 +254,37 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param key Key to add.
-     * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
-     * @param valBytes Value bytes, {@code null} if should be removed.
      * @param entryProcessor Entry processor.
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
     public void addNearWriteValue(KeyCacheObject key,
-        @Nullable byte[] keyBytes,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime)
     {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
-            nearKeyBytes = new ArrayList<>();
 
             if (forceTransformBackups) {
                 nearEntryProcessors = new ArrayList<>();
                 nearEntryProcessorsBytes = new ArrayList<>();
             }
-            else {
+            else
                 nearVals = new ArrayList<>();
-                nearValBytes = new ArrayList<>();
-            }
         }
 
         nearKeys.add(key);
-        nearKeyBytes.add(keyBytes);
 
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
             nearEntryProcessors.add(entryProcessor);
         }
-        else {
+        else
             nearVals.add(val);
-            nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
-        }
 
         if (ttl >= 0) {
             if (nearTtls == null) {
@@ -444,39 +404,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
-     * @return Key bytes.
-     */
-    @Nullable public byte[] keyBytes(int idx) {
-        return keyBytes == null ? null : keyBytes.get(idx);
-    }
-
-    /**
-     * @param idx Near key index.
-     * @return Key bytes.
-     */
-    @Nullable public byte[] nearKeyBytes(int idx) {
-        return nearKeyBytes == null ? null : nearKeyBytes.get(idx);
-    }
-
-    /**
-     * @param idx Key index.
      * @return Value.
      */
     @Nullable public CacheObject value(int idx) {
-        if (vals != null) {
-            CacheObject val = vals.get(idx);
-
-            if (val != null)
-                return val;
-        }
-
-// TODO IGNITE-51.
-//        if (valBytes != null) {
-//            GridCacheValueBytes valBytes0 = valBytes.get(idx);
-//
-//            if (valBytes0 != null && valBytes0.isPlain())
-//                return valBytes0.get();
-//        }
+        if (vals != null)
+            return vals.get(idx);
 
         return null;
     }
@@ -494,20 +426,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @return Value.
      */
     @Nullable public CacheObject nearValue(int idx) {
-        if (nearVals != null) {
-            CacheObject val = nearVals.get(idx);
-
-            if (val != null)
-                return val;
-        }
-
-// TODO IGNITE-51.
-//        if (nearValBytes != null) {
-//            GridCacheValueBytes valBytes0 = nearValBytes.get(idx);
-//
-//            if (valBytes0 != null && valBytes0.isPlain())
-//                return (V)valBytes0.get();
-//        }
+        if (nearVals != null)
+            return nearVals.get(idx);
 
         return null;
     }
@@ -521,36 +441,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
-     * @param idx Key index.
-     * @return Value bytes.
-     */
-    @Nullable public byte[] valueBytes(int idx) {
-        if (valBytes != null) {
-            GridCacheValueBytes valBytes0 = valBytes.get(idx);
-
-            if (valBytes0 != null && !valBytes0.isPlain())
-                return valBytes0.get();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param idx Near key index.
-     * @return Value bytes.
-     */
-    @Nullable public byte[] nearValueBytes(int idx) {
-        if (nearValBytes != null) {
-            GridCacheValueBytes valBytes0 = nearValBytes.get(idx);
-
-            if (valBytes0 != null && !valBytes0.isPlain())
-                return valBytes0.get();
-        }
-
-        return null;
-    }
-
-    /**
      * @param idx Index.
      * @return Conflict version.
      */
@@ -632,8 +522,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        keyBytes = marshalCollection(keys, ctx);
-        valBytes = marshalValuesCollection(vals, ctx);
+        prepareMarshalCacheObjects(keys, ctx);
+
+        prepareMarshalCacheObjects(vals, ctx);
+
+        prepareMarshalCacheObjects(nearKeys, ctx);
+
+        prepareMarshalCacheObjects(nearVals, ctx);
 
         if (forceTransformBackups) {
             invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
@@ -641,9 +536,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             entryProcessorsBytes = marshalCollection(entryProcessors, ctx);
         }
 
-        nearKeyBytes = marshalCollection(nearKeys, ctx);
-        nearValBytes = marshalValuesCollection(nearVals, ctx);
-
         if (forceTransformBackups)
             nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, ctx);
     }
@@ -652,8 +544,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        keys = unmarshalCollection(keyBytes, ctx, ldr);
-        vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
+        finishUnmarshalCacheObjects(keys, ctx, ldr);
+
+        finishUnmarshalCacheObjects(vals, ctx, ldr);
+
+        finishUnmarshalCacheObjects(nearKeys, ctx, ldr);
+
+        finishUnmarshalCacheObjects(nearVals, ctx, ldr);
 
         if (forceTransformBackups) {
             entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
@@ -661,9 +558,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
         }
 
-        nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
-        nearVals = unmarshalValueBytesCollection(nearValBytes, ctx, ldr);
-
         if (forceTransformBackups)
             nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
     }
@@ -720,7 +614,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeCollection("keyBytes", keyBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -738,7 +632,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeCollection("nearKeyBytes", nearKeyBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -750,7 +644,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeCollection("nearValBytes", nearValBytes, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -792,7 +686,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("valBytes", valBytes, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -868,7 +762,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 9:
-                keyBytes = reader.readCollection("keyBytes", MessageCollectionItemType.BYTE_ARR);
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -892,7 +786,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 12:
-                nearKeyBytes = reader.readCollection("nearKeyBytes", MessageCollectionItemType.BYTE_ARR);
+                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -908,7 +802,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 14:
-                nearValBytes = reader.readCollection("nearValBytes", MessageCollectionItemType.MSG);
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -968,7 +862,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 21:
-                valBytes = reader.readCollection("valBytes", MessageCollectionItemType.MSG);
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index e94ff11..3167820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -45,11 +44,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
     /** Failed keys. */
     @GridToStringInclude
-    @GridDirectTransient
-    private Collection<KeyCacheObject> failedKeys;
-
-    /** Serialized failed keys. */
-    private byte[] failedKeysBytes;
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> failedKeys;
 
     /** Update error. */
     @GridDirectTransient
@@ -60,12 +56,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
     /** Evicted readers. */
     @GridToStringInclude
-    @GridDirectTransient
-    private Collection<KeyCacheObject> nearEvicted;
-
-    /** Evicted reader key bytes. */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearEvictedBytes;
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> nearEvicted;
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -138,20 +130,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
      * Adds near evicted key..
      *
      * @param key Evicted key.
-     * @param bytes Bytes of evicted key.
      */
-    public void addNearEvicted(KeyCacheObject key, @Nullable byte[] bytes) {
+    public void addNearEvicted(KeyCacheObject key) {
         if (nearEvicted == null)
             nearEvicted = new ArrayList<>();
 
         nearEvicted.add(key);
-
-        if (bytes != null) {
-            if (nearEvictedBytes == null)
-                nearEvictedBytes = new ArrayList<>();
-
-            nearEvictedBytes.add(bytes);
-        }
     }
 
     /** {@inheritDoc}
@@ -159,22 +143,22 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        failedKeysBytes = ctx.marshaller().marshal(failedKeys);
-        errBytes = ctx.marshaller().marshal(err);
+        prepareMarshalCacheObjects(failedKeys, ctx);
+
+        prepareMarshalCacheObjects(nearEvicted, ctx);
 
-        if (nearEvictedBytes == null)
-            nearEvictedBytes = marshalCollection(nearEvicted, ctx);
+        errBytes = ctx.marshaller().marshal(err);
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        failedKeys = ctx.marshaller().unmarshal(failedKeysBytes, ldr);
-        err = ctx.marshaller().unmarshal(errBytes, ldr);
+        finishUnmarshalCacheObjects(failedKeys, ctx, ldr);
+
+        finishUnmarshalCacheObjects(nearEvicted, ctx, ldr);
 
-        if (nearEvicted == null && nearEvictedBytes != null)
-            nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr);
+        err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -199,7 +183,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByteArray("failedKeysBytes", failedKeysBytes))
+                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -211,7 +195,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("nearEvictedBytes", nearEvictedBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -241,7 +225,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 4:
-                failedKeysBytes = reader.readByteArray("failedKeysBytes");
+                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -257,7 +241,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 6:
-                nearEvictedBytes = reader.readCollection("nearEvictedBytes", MessageCollectionItemType.BYTE_ARR);
+                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 08804ca..d36a850 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -594,7 +594,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                 subjId,
                 taskNameHash);
 
-            req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, true);
+            req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
 
             single = true;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6b41833/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 029669a..d3e9fb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -72,21 +72,21 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     private GridCacheOperation op;
 
     /** Keys to update. */
-    @GridDirectTransient
     @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> keys;
 
-    /** Key bytes. */
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> keyBytes;
-
     /** Values to update. */
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> vals;
+
+    /** Entry processors. */
     @GridDirectTransient
-    private List<Object> vals;
+    private List<EntryProcessor<Object, Object, Object>> entryProcessors;
 
-    /** Value bytes. */
-    @GridDirectCollection(GridCacheValueBytes.class)
-    private List<GridCacheValueBytes> valBytes;
+    /** Entry processors bytes. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> entryProcessorsBytes;
 
     /** Optional arguments for entry processor. */
     @GridDirectTransient
@@ -195,7 +195,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.taskNameHash = taskNameHash;
 
         keys = new ArrayList<>();
-        vals = new ArrayList<>();
     }
 
     /** {@inheritDoc} */
@@ -301,11 +300,32 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer,
         boolean primary) {
+        EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+        if (op == TRANSFORM) {
+            assert val instanceof EntryProcessor : val;
+
+            entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+        }
+
         assert val != null || op == DELETE;
-        assert op != TRANSFORM || val instanceof EntryProcessor;
 
         keys.add(key);
-        vals.add(val);
+
+        if (entryProcessor != null) {
+            if (entryProcessors == null)
+                entryProcessors = new ArrayList<>();
+
+            entryProcessors.add(entryProcessor);
+        }
+        else if (val != null) {
+            assert val instanceof CacheObject : val;
+
+            if (vals == null)
+                vals = new ArrayList<>();
+
+            vals.add((CacheObject)val);
+        }
 
         hasPrimary |= primary;
 
@@ -356,8 +376,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /**
      * @return Values for this update request.
      */
-    public List<Object> values() {
-        return vals;
+    public List<?> values() {
+        return op == TRANSFORM ? entryProcessors : vals;
     }
 
     /**
@@ -382,7 +402,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     public CacheObject value(int idx) {
         assert op == UPDATE : op;
 
-        return (CacheObject)vals.get(idx);
+        return vals.get(idx);
     }
 
     /**
@@ -393,42 +413,16 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
         assert op == TRANSFORM : op;
 
-        return (EntryProcessor<Object, Object, Object>)vals.get(idx);
+        return entryProcessors.get(idx);
     }
 
     /**
      * @param idx Index to get.
      * @return Write value - either value, or transform closure.
      */
-    public Object writeValue(int idx) {
-        if (vals != null) {
-            Object val = vals.get(idx);
-
-            if (val != null)
-                return val;
-        }
-
-        if (valBytes != null) {
-            GridCacheValueBytes valBytesTuple = valBytes.get(idx);
-
-            if (valBytesTuple != null && valBytesTuple.isPlain())
-                return valBytesTuple.get();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value bytes.
-     */
-    public byte[] valueBytes(int idx) {
-        if (op != TRANSFORM && valBytes != null) {
-            GridCacheValueBytes valBytesTuple = valBytes.get(idx);
-
-            if (valBytesTuple != null && !valBytesTuple.isPlain())
-                return valBytesTuple.get();
-        }
+    public CacheObject writeValue(int idx) {
+        if (vals != null)
+            return vals.get(idx);
 
         return null;
     }
@@ -508,8 +502,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        keyBytes = marshalCollection(keys, ctx);
-        valBytes = marshalValuesCollection(vals, ctx);
+        prepareMarshalCacheObjects(keys, ctx);
+
+        if (op == TRANSFORM)
+            entryProcessorsBytes = marshalCollection(entryProcessors, ctx);
+        else
+            prepareMarshalCacheObjects(vals, ctx);
+
         filterBytes = marshalFilter(filter, ctx);
         invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
 
@@ -521,8 +520,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        keys = unmarshalCollection(keyBytes, ctx, ldr);
-        vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
+        finishUnmarshalCacheObjects(keys, ctx, ldr);
+
+        if (op == TRANSFORM)
+            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+        else
+            finishUnmarshalCacheObjects(vals, ctx, ldr);
+
         filter = unmarshalFilter(filterBytes, ctx, ldr);
         invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
 
@@ -564,97 +568,103 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeBoolean("fastMap", fastMap))
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("fastMap", fastMap))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
+                if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("keyBytes", keyBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeByte("op", op != null ? (byte) op.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte) syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeLong("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeLong("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("valBytes", valBytes, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -700,7 +710,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 6:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -708,7 +718,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 7:
-                fastMap = reader.readBoolean("fastMap");
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -716,7 +726,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 8:
-                filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                fastMap = reader.readBoolean("fastMap");
 
                 if (!reader.isLastRead())
                     return false;
@@ -724,7 +734,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 9:
-                forceTransformBackups = reader.readBoolean("forceTransformBackups");
+                filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -732,7 +742,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 10:
-                futVer = reader.readMessage("futVer");
+                forceTransformBackups = reader.readBoolean("forceTransformBackups");
 
                 if (!reader.isLastRead())
                     return false;
@@ -740,7 +750,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 11:
-                hasPrimary = reader.readBoolean("hasPrimary");
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -748,7 +758,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 12:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                hasPrimary = reader.readBoolean("hasPrimary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -756,7 +766,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 13:
-                keyBytes = reader.readCollection("keyBytes", MessageCollectionItemType.BYTE_ARR);
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -764,6 +774,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 14:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 byte opOrd;
 
                 opOrd = reader.readByte("op");
@@ -775,7 +793,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -783,7 +801,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -791,7 +809,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -803,7 +821,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -811,7 +829,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 topVer = reader.readLong("topVer");
 
                 if (!reader.isLastRead())
@@ -819,7 +837,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -827,8 +845,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
-                valBytes = reader.readCollection("valBytes", MessageCollectionItemType.MSG);
+            case 22:
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -847,7 +865,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 22;
+        return 23;
     }
 
     /** {@inheritDoc} */


Mime
View raw message