ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [27/51] [abbrv] ignite git commit: ignite-2523 : Created GridDhtAtomicSingleUpdateRequest optimized implementation.
Date Thu, 25 Feb 2016 12:31:26 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java
new file mode 100644
index 0000000..6a69ccc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java
@@ -0,0 +1,296 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class GridDhtAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateResponse {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Failed keys. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> failedKeys;
+
+    /** Update error. */
+    @GridDirectTransient
+    private IgniteCheckedException err;
+
+    /** Serialized update error. */
+    private byte[] errBytes;
+
+    /** Evicted readers. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> nearEvicted;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDhtAtomicSingleUpdateResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param futVer Future version.
+     * @param addDepInfo Deployment info.
+     */
+    public GridDhtAtomicSingleUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+        this.cacheId = cacheId;
+        this.futVer = futVer;
+        this.addDepInfo = addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Future version.
+     */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /**
+     * Sets update error.
+     *
+     * @param err Error.
+     */
+    @Override public void onError(IgniteCheckedException err) {
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
+        return err;
+    }
+
+    /**
+     * @return Failed keys.
+     */
+    @Override public Collection<KeyCacheObject> failedKeys() {
+        return failedKeys;
+    }
+
+    /**
+     * Adds key to collection of failed keys.
+     *
+     * @param key Key to add.
+     * @param e Error cause.
+     */
+    @Override public void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>();
+
+        failedKeys.add(key);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * @return Evicted readers.
+     */
+    @Override public Collection<KeyCacheObject> nearEvicted() {
+        return nearEvicted;
+    }
+
+    /**
+     * Adds near evicted key..
+     *
+     * @param key Evicted key.
+     */
+    @Override public void addNearEvicted(KeyCacheObject key) {
+        if (nearEvicted == null)
+            nearEvicted = new ArrayList<>();
+
+        nearEvicted.add(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObjects(failedKeys, cctx);
+
+        prepareMarshalCacheObjects(nearEvicted, cctx);
+
+        if (errBytes == null)
+            errBytes = ctx.marshaller().marshal(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+        finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
+
+        if (errBytes != null && err == null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicSingleUpdateResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 39;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 7;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicSingleUpdateResponse.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a31700..e19a11a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -237,19 +238,34 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
                 GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
 
                 if (updateReq == null) {
-                    updateReq = new GridDhtAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futVer,
-                        writeVer,
-                        syncMode,
-                        topVer,
-                        forceTransformBackups,
-                        this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash(),
-                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary());
+                    if (this.updateReq instanceof GridNearAtomicSingleUpdateRequest)
+                        updateReq = new GridDhtAtomicSingleUpdateRequest(
+                            cctx.cacheId(),
+                            nodeId,
+                            futVer,
+                            writeVer,
+                            syncMode,
+                            topVer,
+                            forceTransformBackups,
+                            this.updateReq.subjectId(),
+                            this.updateReq.taskNameHash(),
+                            forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                            cctx.deploymentEnabled(),
+                            this.updateReq.keepBinary());
+                    else
+                        updateReq = new GridDhtAtomicMultipleUpdateRequest(
+                            cctx.cacheId(),
+                            nodeId,
+                            futVer,
+                            writeVer,
+                            syncMode,
+                            topVer,
+                            forceTransformBackups,
+                            this.updateReq.subjectId(),
+                            this.updateReq.taskNameHash(),
+                            forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                            cctx.deploymentEnabled(),
+                            this.updateReq.keepBinary());
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -309,19 +325,34 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
                 if (node == null)
                     continue;
 
-                updateReq = new GridDhtAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
-                    futVer,
-                    writeVer,
-                    syncMode,
-                    topVer,
-                    forceTransformBackups,
-                    this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                    cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary());
+                if (this.updateReq instanceof GridNearAtomicSingleUpdateRequest)
+                    updateReq = new GridDhtAtomicSingleUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futVer,
+                        writeVer,
+                        syncMode,
+                        topVer,
+                        forceTransformBackups,
+                        this.updateReq.subjectId(),
+                        this.updateReq.taskNameHash(),
+                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                        cctx.deploymentEnabled(),
+                        this.updateReq.keepBinary());
+                else
+                    updateReq = new GridDhtAtomicMultipleUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futVer,
+                        writeVer,
+                        syncMode,
+                        topVer,
+                        forceTransformBackups,
+                        this.updateReq.subjectId(),
+                        this.updateReq.taskNameHash(),
+                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                        cctx.deploymentEnabled(),
+                        this.updateReq.keepBinary());
 
                 mappings.put(nodeId, updateReq);
             }
@@ -348,7 +379,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
                 if (!mappings.isEmpty()) {
                     Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
 
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                    exit:
+                    for (GridDhtAtomicUpdateRequest req : mappings.values()) {
                         for (int i = 0; i < req.size(); i++) {
                             KeyCacheObject key = req.key(i);
 
@@ -416,7 +448,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
                     if (log.isDebugEnabled())
                         log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                    cctx.io().send(req.nodeId(), (GridCacheMessage)req, cctx.ioPolicy());
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7cc276f..0ab67a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -1,248 +1,44 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  */
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import javax.cache.processor.EntryProcessor;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
 
-/**
- * Lite dht cache backup update request.
- */
-public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** Node ID. */
-    private UUID nodeId;
-
-    /** Future version. */
-    private GridCacheVersion futVer;
-
-    /** Write version. */
-    private GridCacheVersion writeVer;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
-    /** Keys to update. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> keys;
-
-    /** Values to update. */
-    @GridToStringInclude
-    @GridDirectCollection(CacheObject.class)
-    private List<CacheObject> vals;
-
-    /** Previous values. */
-    @GridToStringInclude
-    @GridDirectCollection(CacheObject.class)
-    private List<CacheObject> prevVals;
-
-    /** Conflict versions. */
-    @GridDirectCollection(GridCacheVersion.class)
-    private List<GridCacheVersion> conflictVers;
-
-    /** TTLs. */
-    private GridLongList ttls;
-
-    /** Conflict expire time. */
-    private GridLongList conflictExpireTimes;
-
-    /** Near TTLs. */
-    private GridLongList nearTtls;
-
-    /** Near expire times. */
-    private GridLongList nearExpireTimes;
-
-    /** Write synchronization mode. */
-    private CacheWriteSynchronizationMode syncMode;
-
-    /** Near cache keys to update. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> nearKeys;
-
-    /** Values to update. */
-    @GridToStringInclude
-    @GridDirectCollection(CacheObject.class)
-    private List<CacheObject> nearVals;
-
-    /** Force transform backups flag. */
-    private boolean forceTransformBackups;
-
-    /** Entry processors. */
-    @GridDirectTransient
-    private List<EntryProcessor<Object, Object, Object>> entryProcessors;
-
-    /** Entry processors bytes. */
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> entryProcessorsBytes;
-
-    /** Near entry processors. */
-    @GridDirectTransient
-    private List<EntryProcessor<Object, Object, Object>> nearEntryProcessors;
-
-    /** Near entry processors bytes. */
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> nearEntryProcessorsBytes;
-
-    /** Optional arguments for entry processor. */
-    @GridDirectTransient
-    private Object[] invokeArgs;
-
-    /** Entry processor arguments bytes. */
-    private byte[][] invokeArgsBytes;
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
-    /** Partition. */
-    private GridLongList updateCntrs;
-
-    /** On response flag. Access should be synced on future. */
-    @GridDirectTransient
-    private boolean onRes;
-
-    /** */
-    @GridDirectTransient
-    private List<Integer> partIds;
-
-    /** */
-    @GridDirectTransient
-    private List<CacheObject> locPrevVals;
-
-    /** Keep binary flag. */
-    private boolean keepBinary;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridDhtAtomicUpdateRequest() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cacheId Cache ID.
-     * @param nodeId Node ID.
-     * @param futVer Future version.
-     * @param writeVer Write version for cache values.
-     * @param invokeArgs Optional arguments for entry processor.
-     * @param syncMode Cache write synchronization mode.
-     * @param topVer Topology version.
-     * @param forceTransformBackups Force transform backups flag.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash code.
-     * @param addDepInfo Deployment info.
-     */
-    public GridDhtAtomicUpdateRequest(
-        int cacheId,
-        UUID nodeId,
-        GridCacheVersion futVer,
-        GridCacheVersion writeVer,
-        CacheWriteSynchronizationMode syncMode,
-        @NotNull AffinityTopologyVersion topVer,
-        boolean forceTransformBackups,
-        UUID subjId,
-        int taskNameHash,
-        Object[] invokeArgs,
-        boolean addDepInfo,
-        boolean keepBinary
-    ) {
-        assert invokeArgs == null || forceTransformBackups;
-
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
-        this.futVer = futVer;
-        this.writeVer = writeVer;
-        this.syncMode = syncMode;
-        this.topVer = topVer;
-        this.forceTransformBackups = forceTransformBackups;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.invokeArgs = invokeArgs;
-        this.addDepInfo = addDepInfo;
-        this.keepBinary = keepBinary;
-
-        keys = new ArrayList<>();
-        partIds = new ArrayList<>();
-        locPrevVals = new ArrayList<>();
-
-        if (forceTransformBackups) {
-            entryProcessors = new ArrayList<>();
-            entryProcessorsBytes = new ArrayList<>();
-        }
-        else
-            vals = new ArrayList<>();
-    }
+public interface GridDhtAtomicUpdateRequest {
 
-    /**
-     * @return Force transform backups flag.
-     */
-    public boolean forceTransformBackups() {
-        return forceTransformBackups;
-    }
+    boolean forceTransformBackups();
 
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     * @param addPrevVal If {@code true} adds previous value.
-     * @param prevVal Previous value.
-     */
-    public void addWriteValue(KeyCacheObject key,
+    void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -251,815 +47,85 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateIdx) {
-        keys.add(key);
-
-        partIds.add(partId);
-
-        locPrevVals.add(prevVal);
-
-        if (forceTransformBackups) {
-            assert entryProcessor != null;
-
-            entryProcessors.add(entryProcessor);
-        }
-        else
-            vals.add(val);
-
-        if (addPrevVal) {
-            if (prevVals == null)
-                prevVals = new ArrayList<>();
-
-            prevVals.add(prevVal);
-        }
-
-        if (updateIdx != null) {
-            if (updateCntrs == null)
-                updateCntrs = new GridLongList();
-
-            updateCntrs.add(updateIdx);
-        }
-
-        // In case there is no conflict, do not create the list.
-        if (conflictVer != null) {
-            if (conflictVers == null) {
-                conflictVers = new ArrayList<>();
-
-                for (int i = 0; i < keys.size() - 1; i++)
-                    conflictVers.add(null);
-            }
-
-            conflictVers.add(conflictVer);
-        }
-        else if (conflictVers != null)
-            conflictVers.add(null);
-
-        if (ttl >= 0) {
-            if (ttls == null) {
-                ttls = new GridLongList(keys.size());
-
-                for (int i = 0; i < keys.size() - 1; i++)
-                    ttls.add(CU.TTL_NOT_CHANGED);
-            }
-        }
+        @Nullable Long updateIdx);
 
-        if (ttls != null)
-            ttls.add(ttl);
-
-        if (conflictExpireTime >= 0) {
-            if (conflictExpireTimes == null) {
-                conflictExpireTimes = new GridLongList(keys.size());
-
-                for (int i = 0; i < keys.size() - 1; i++)
-                    conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
-            }
-        }
-
-        if (conflictExpireTimes != null)
-            conflictExpireTimes.add(conflictExpireTime);
-    }
-
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL.
-     * @param expireTime Expire time.
-     */
-    public void addNearWriteValue(KeyCacheObject key,
+    void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
-        long expireTime) {
-        if (nearKeys == null) {
-            nearKeys = new ArrayList<>();
-
-            if (forceTransformBackups) {
-                nearEntryProcessors = new ArrayList<>();
-                nearEntryProcessorsBytes = new ArrayList<>();
-            }
-            else
-                nearVals = new ArrayList<>();
-        }
-
-        nearKeys.add(key);
-
-        if (forceTransformBackups) {
-            assert entryProcessor != null;
-
-            nearEntryProcessors.add(entryProcessor);
-        }
-        else
-            nearVals.add(val);
-
-        if (ttl >= 0) {
-            if (nearTtls == null) {
-                nearTtls = new GridLongList(nearKeys.size());
-
-                for (int i = 0; i < nearKeys.size() - 1; i++)
-                    nearTtls.add(CU.TTL_NOT_CHANGED);
-            }
-        }
-
-        if (nearTtls != null)
-            nearTtls.add(ttl);
-
-        if (expireTime >= 0) {
-            if (nearExpireTimes == null) {
-                nearExpireTimes = new GridLongList(nearKeys.size());
-
-                for (int i = 0; i < nearKeys.size() - 1; i++)
-                    nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
-            }
-        }
-
-        if (nearExpireTimes != null)
-            nearExpireTimes.add(expireTime);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Subject ID.
-     */
-    public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name.
-     */
-    public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
-     * @return Keys size.
-     */
-    public int size() {
-        return keys.size();
-    }
-
-    /**
-     * @return Keys size.
-     */
-    public int nearSize() {
-        return nearKeys != null ? nearKeys.size() : 0;
-    }
-
-    /**
-     * @return Version assigned on primary node.
-     */
-    public GridCacheVersion futureVersion() {
-        return futVer;
-    }
-
-    /**
-     * @return Write version.
-     */
-    public GridCacheVersion writeVersion() {
-        return writeVer;
-    }
-
-    /**
-     * @return Cache write synchronization mode.
-     */
-    public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Keys.
-     */
-    public Collection<KeyCacheObject> keys() {
-        return keys;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Key.
-     */
-    public KeyCacheObject key(int idx) {
-        return keys.get(idx);
-    }
-
-    /**
-     * @param idx Partition index.
-     * @return Partition id.
-     */
-    public int partitionId(int idx) {
-        return partIds.get(idx);
-    }
-
-    /**
-     * @param updCntr Update counter.
-     * @return Update counter.
-     */
-    public Long updateCounter(int updCntr) {
-        if (updateCntrs != null && updCntr < updateCntrs.size())
-            return updateCntrs.get(updCntr);
-
-        return null;
-    }
-
-    /**
-     * @param idx Near key index.
-     * @return Key.
-     */
-    public KeyCacheObject nearKey(int idx) {
-        return nearKeys.get(idx);
-    }
-
-    /**
-     * @return Keep binary flag.
-     */
-    public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject value(int idx) {
-        if (vals != null)
-            return vals.get(idx);
-
-        return null;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject previousValue(int idx) {
-        if (prevVals != null)
-            return prevVals.get(idx);
-
-        return null;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject localPreviousValue(int idx) {
-        return locPrevVals.get(idx);
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Entry processor.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
-        return entryProcessors == null ? null : entryProcessors.get(idx);
-    }
-
-    /**
-     * @param idx Near key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject nearValue(int idx) {
-        if (nearVals != null)
-            return nearVals.get(idx);
-
-        return null;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Transform closure.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
-        return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
-    }
-
-    /**
-     * @param idx Index.
-     * @return Conflict version.
-     */
-    @Nullable public GridCacheVersion conflictVersion(int idx) {
-        if (conflictVers != null) {
-            assert idx >= 0 && idx < conflictVers.size();
-
-            return conflictVers.get(idx);
-        }
-
-        return null;
-    }
-
-    /**
-     * @param idx Index.
-     * @return TTL.
-     */
-    public long ttl(int idx) {
-        if (ttls != null) {
-            assert idx >= 0 && idx < ttls.size();
-
-            return ttls.get(idx);
-        }
-
-        return CU.TTL_NOT_CHANGED;
-    }
-
-    /**
-     * @param idx Index.
-     * @return TTL for near cache update.
-     */
-    public long nearTtl(int idx) {
-        if (nearTtls != null) {
-            assert idx >= 0 && idx < nearTtls.size();
-
-            return nearTtls.get(idx);
-        }
-
-        return CU.TTL_NOT_CHANGED;
-    }
-
-    /**
-     * @param idx Index.
-     * @return Conflict expire time.
-     */
-    public long conflictExpireTime(int idx) {
-        if (conflictExpireTimes != null) {
-            assert idx >= 0 && idx < conflictExpireTimes.size();
-
-            return conflictExpireTimes.get(idx);
-        }
-
-        return CU.EXPIRE_TIME_CALCULATE;
-    }
-
-    /**
-     * @param idx Index.
-     * @return Expire time for near cache update.
-     */
-    public long nearExpireTime(int idx) {
-        if (nearExpireTimes != null) {
-            assert idx >= 0 && idx < nearExpireTimes.size();
-
-            return nearExpireTimes.get(idx);
-        }
-
-        return CU.EXPIRE_TIME_CALCULATE;
-    }
-
-    /**
-     * @return {@code True} if on response flag changed.
-     */
-    public boolean onResponse() {
-        return !onRes && (onRes = true);
-    }
-
-    /**
-     * @return Optional arguments for entry processor.
-     */
-    @Nullable public Object[] invokeArguments() {
-        return invokeArgs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        prepareMarshalCacheObjects(keys, cctx);
-
-        prepareMarshalCacheObjects(vals, cctx);
-
-        prepareMarshalCacheObjects(nearKeys, cctx);
-
-        prepareMarshalCacheObjects(nearVals, cctx);
-
-        prepareMarshalCacheObjects(prevVals, cctx);
-
-        if (forceTransformBackups) {
-            // force addition of deployment info for entry processors if P2P is enabled globally.
-            if (!addDepInfo && ctx.deploymentEnabled())
-                addDepInfo = true;
-
-            if (invokeArgsBytes == null)
-                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
-
-            if (entryProcessorsBytes == null)
-                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
-
-            if (nearEntryProcessorsBytes == null)
-                nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        finishUnmarshalCacheObjects(keys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(vals, cctx, ldr);
-
-        finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
-
-        finishUnmarshalCacheObjects(prevVals, cctx, ldr);
-
-        if (forceTransformBackups) {
-            if (entryProcessors == null)
-                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-
-            if (invokeArgs == null)
-                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-
-            if (nearEntryProcessors == null)
-                nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeMessage("futVer", futVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
-                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 14:
-                if (!writer.writeMessage("nearTtls", nearTtls))
-                    return false;
-
-                writer.incrementState();
-
-            case 15:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 16:
-                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 17:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 20:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 21:
-                if (!writer.writeMessage("ttls", ttls))
-                    return false;
-
-                writer.incrementState();
-
-            case 22:
-                if (!writer.writeMessage("updateCntrs", updateCntrs))
-                    return false;
-
-                writer.incrementState();
-
-            case 23:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 24:
-                if (!writer.writeMessage("writeVer", writeVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                forceTransformBackups = reader.readBoolean("forceTransformBackups");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                futVer = reader.readMessage("futVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                keepBinary = reader.readBoolean("keepBinary");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
-                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 14:
-                nearTtls = reader.readMessage("nearTtls");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
+        long expireTime);
 
-            case 15:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+    int lookupIndex();
 
-                if (!reader.isLastRead())
-                    return false;
+    UUID nodeId();
 
-                reader.incrementState();
+    UUID subjectId();
 
-            case 16:
-                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+    int taskNameHash();
 
-                if (!reader.isLastRead())
-                    return false;
+    int size();
 
-                reader.incrementState();
+    int nearSize();
 
-            case 17:
-                subjId = reader.readUuid("subjId");
+    GridCacheVersion futureVersion();
 
-                if (!reader.isLastRead())
-                    return false;
+    GridCacheVersion writeVersion();
 
-                reader.incrementState();
+    CacheWriteSynchronizationMode writeSynchronizationMode();
 
-            case 18:
-                byte syncModeOrd;
+    AffinityTopologyVersion topologyVersion();
 
-                syncModeOrd = reader.readByte("syncMode");
+    Collection<KeyCacheObject> keys();
 
-                if (!reader.isLastRead())
-                    return false;
+    KeyCacheObject key(int idx);
 
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+    int partitionId(int idx);
 
-                reader.incrementState();
+    Long updateCounter(int updCntr);
 
-            case 19:
-                taskNameHash = reader.readInt("taskNameHash");
+    KeyCacheObject nearKey(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    boolean keepBinary();
 
-                reader.incrementState();
+    @Nullable CacheObject value(int idx);
 
-            case 20:
-                topVer = reader.readMessage("topVer");
+    @Nullable CacheObject previousValue(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    @Nullable CacheObject localPreviousValue(int idx);
 
-                reader.incrementState();
+    @Nullable EntryProcessor<Object, Object, Object> entryProcessor(int idx);
 
-            case 21:
-                ttls = reader.readMessage("ttls");
+    @Nullable CacheObject nearValue(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    @Nullable EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx);
 
-                reader.incrementState();
+    @Nullable GridCacheVersion conflictVersion(int idx);
 
-            case 22:
-                updateCntrs = reader.readMessage("updateCntrs");
+    long ttl(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    long nearTtl(int idx);
 
-                reader.incrementState();
+    long conflictExpireTime(int idx);
 
-            case 23:
-                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+    long nearExpireTime(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    boolean onResponse();
 
-                reader.incrementState();
+    @Nullable Object[] invokeArguments();
 
-            case 24:
-                writeVer = reader.readMessage("writeVer");
+    void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException;
 
-                if (!reader.isLastRead())
-                    return false;
+    void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException;
 
-                reader.incrementState();
+    boolean addDeploymentInfo();
 
-        }
+    boolean writeTo(ByteBuffer buf, MessageWriter writer);
 
-        return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
-    }
+    boolean readFrom(ByteBuffer buf, MessageReader reader);
 
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 38;
-    }
+    byte directType();
 
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 25;
-    }
+    byte fieldsCount();
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());
-    }
+    IgniteCheckedException classError();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 8f1d9a2..a74fed6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -1,297 +1,63 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  */
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import java.nio.ByteBuffer;
+import java.util.Collection;
 
-/**
- * DHT atomic cache backup update response.
- */
-public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** Future version. */
-    private GridCacheVersion futVer;
-
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> failedKeys;
-
-    /** Update error. */
-    @GridDirectTransient
-    private IgniteCheckedException err;
-
-    /** Serialized update error. */
-    private byte[] errBytes;
-
-    /** Evicted readers. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> nearEvicted;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridDhtAtomicUpdateResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param futVer Future version.
-     * @param addDepInfo Deployment info.
-     */
-    public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
-        this.cacheId = cacheId;
-        this.futVer = futVer;
-        this.addDepInfo = addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Future version.
-     */
-    public GridCacheVersion futureVersion() {
-        return futVer;
-    }
-
-    /**
-     * Sets update error.
-     *
-     * @param err Error.
-     */
-    public void onError(IgniteCheckedException err){
-        this.err = err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteCheckedException error() {
-        return err;
-    }
-
-    /**
-     * @return Failed keys.
-     */
-    public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
-    }
-
-    /**
-     * Adds key to collection of failed keys.
-     *
-     * @param key Key to add.
-     * @param e Error cause.
-     */
-    public void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>();
-
-        failedKeys.add(key);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /**
-     * @return Evicted readers.
-     */
-    public Collection<KeyCacheObject> nearEvicted() {
-        return nearEvicted;
-    }
-
-    /**
-     * Adds near evicted key..
-     *
-     * @param key Evicted key.
-     */
-    public void addNearEvicted(KeyCacheObject key) {
-        if (nearEvicted == null)
-            nearEvicted = new ArrayList<>();
-
-        nearEvicted.add(key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        prepareMarshalCacheObjects(failedKeys, cctx);
-
-        prepareMarshalCacheObjects(nearEvicted, cctx);
-
-        if (errBytes == null)
-            errBytes = ctx.marshaller().marshal(err);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
-
-        if (errBytes != null && err == null)
-            err = ctx.marshaller().unmarshal(errBytes, ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeByteArray("errBytes", errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeMessage("futVer", futVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                errBytes = reader.readByteArray("errBytes");
-
-                if (!reader.isLastRead())
-                    return false;
+public interface GridDhtAtomicUpdateResponse {
+    int lookupIndex();
 
-                reader.incrementState();
+    GridCacheVersion futureVersion();
 
-            case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+    void onError(IgniteCheckedException err);
 
-                if (!reader.isLastRead())
-                    return false;
+    IgniteCheckedException error();
 
-                reader.incrementState();
+    Collection<KeyCacheObject> failedKeys();
 
-            case 5:
-                futVer = reader.readMessage("futVer");
+    void addFailedKey(KeyCacheObject key, Throwable e);
 
-                if (!reader.isLastRead())
-                    return false;
+    Collection<KeyCacheObject> nearEvicted();
 
-                reader.incrementState();
+    void addNearEvicted(KeyCacheObject key);
 
-            case 6:
-                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+    void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException;
 
-                if (!reader.isLastRead())
-                    return false;
+    void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException;
 
-                reader.incrementState();
+    boolean addDeploymentInfo();
 
-        }
+    boolean writeTo(ByteBuffer buf, MessageWriter writer);
 
-        return reader.afterMessageRead(GridDhtAtomicUpdateResponse.class);
-    }
+    boolean readFrom(ByteBuffer buf, MessageReader reader);
 
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 39;
-    }
+    byte directType();
 
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 7;
-    }
+    byte fieldsCount();
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtAtomicUpdateResponse.class, this);
-    }
+    long messageId();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
index b3f7e74..d6eabd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
@@ -593,7 +593,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
 
         }
 
-        return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
+        return reader.afterMessageRead(GridNearAtomicSingleUpdateResponse.class);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5aef8e7..168076a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -43,9 +43,9 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index 50a6114..0633a1e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -27,7 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -140,7 +140,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
 
             commSpi.registerMessage(GridNearAtomicMultipleUpdateRequest.class);
             commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
-            commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
+            commSpi.registerMessage(GridDhtAtomicMultipleUpdateRequest.class);
 
             int putCnt = 15;
 
@@ -210,7 +210,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
      * @return Count.
      */
     private int dhtRequestsCount(TestCommunicationSpi commSpi) {
-        return commSpi.messageCount(GridDhtAtomicUpdateRequest.class);
+        return commSpi.messageCount(GridDhtAtomicMultipleUpdateRequest.class);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 0e7755b..e3adc21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -480,7 +480,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
             return delay && (
                 (origMsg instanceof GridNearAtomicMultipleUpdateRequest) ||
                 (origMsg instanceof GridNearAtomicSingleUpdateRequest) ||
-                (origMsg instanceof GridDhtAtomicUpdateRequest)
+                (origMsg instanceof GridDhtAtomicMultipleUpdateRequest)
             );
         }
     }


Mime
View raw message