ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [12/15] ignite git commit: IGNITE-2523 "single put" NEAR update request
Date Mon, 21 Nov 2016 13:43:00 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
new file mode 100644
index 0000000..e2314f8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Lite DHT cache update request sent from near node to primary node.
+ */
+public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Target node ID. */
+    @GridDirectTransient
+    private UUID nodeId;
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Fast map flag. */
+    private boolean fastMap;
+
+    /** Update version. Set to non-null if fastMap is {@code true}. */
+    private GridCacheVersion updateVer;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    private boolean topLocked;
+
+    /** Write synchronization mode. */
+    private CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    private GridCacheOperation op;
+
+    /** Keys to update. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> keys;
+
+    /** Values to update. */
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> vals;
+
+    /** Partitions of keys. */
+    @GridDirectCollection(int.class)
+    private List<Integer> partIds;
+
+    /** Entry processors. */
+    @GridDirectTransient
+    private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+    /** Entry processors bytes. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> entryProcessorsBytes;
+
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Entry processor arguments bytes. */
+    private byte[][] invokeArgsBytes;
+
+    /** Conflict versions. */
+    @GridDirectCollection(GridCacheVersion.class)
+    private List<GridCacheVersion> conflictVers;
+
+    /** Conflict TTLs. */
+    private GridLongList conflictTtls;
+
+    /** Conflict expire times. */
+    private GridLongList conflictExpireTimes;
+
+    /** Return value flag. */
+    private boolean retval;
+
+    /** Expiry policy. */
+    @GridDirectTransient
+    private ExpiryPolicy expiryPlc;
+
+    /** Expiry policy bytes. */
+    private byte[] expiryPlcBytes;
+
+    /** Filter. */
+    private CacheEntryPredicate[] filter;
+
+    /** Flag indicating whether request contains primary keys. */
+    private boolean hasPrimary;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name hash. */
+    private int taskNameHash;
+
+    /** Skip write-through to a persistent storage. */
+    private boolean skipStore;
+
+    /** */
+    private boolean clientReq;
+
+    /** Keep binary flag. */
+    private boolean keepBinary;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
+    /** Maximum possible size of inner collections. */
+    @GridDirectTransient
+    private int initSize;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicFullUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param expiryPlc Expiry policy.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param filter Optional filter for atomic check.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     * @param maxEntryCnt Maximum entries count.
+     */
+    GridNearAtomicFullUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable Object[] invokeArgs,
+        @Nullable CacheEntryPredicate[] filter,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo,
+        int maxEntryCnt
+    ) {
+        assert futVer != null;
+
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futVer = futVer;
+        this.fastMap = fastMap;
+        this.updateVer = updateVer;
+
+        this.topVer = topVer;
+        this.topLocked = topLocked;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.invokeArgs = invokeArgs;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.clientReq = clientReq;
+        this.addDepInfo = addDepInfo;
+
+        // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+        // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+        // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+        // than 10, we use it.
+        initSize = Math.min(maxEntryCnt, 10);
+
+        keys = new ArrayList<>(initSize);
+
+        partIds = new ArrayList<>(initSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
+        return subjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion updateVersion() {
+        return updateVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null) {
+            this.res = res;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addUpdateEntry(KeyCacheObject key,
+        @Nullable Object val,
+        long conflictTtl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
+        EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+        if (op == TRANSFORM) {
+            assert val instanceof EntryProcessor : val;
+
+            entryProcessor = (EntryProcessor<Object, Object, Object>)val;
+        }
+
+        assert val != null || op == DELETE;
+
+        keys.add(key);
+        partIds.add(key.partition());
+
+        if (entryProcessor != null) {
+            if (entryProcessors == null)
+                entryProcessors = new ArrayList<>(initSize);
+
+            entryProcessors.add(entryProcessor);
+        }
+        else if (val != null) {
+            assert val instanceof CacheObject : val;
+
+            if (vals == null)
+                vals = new ArrayList<>(initSize);
+
+            vals.add((CacheObject)val);
+        }
+
+        hasPrimary |= primary;
+
+        // In case there is no conflict, do not create the list.
+        if (conflictVer != null) {
+            if (conflictVers == null) {
+                conflictVers = new ArrayList<>(initSize);
+
+                for (int i = 0; i < keys.size() - 1; i++)
+                    conflictVers.add(null);
+            }
+
+            conflictVers.add(conflictVer);
+        }
+        else if (conflictVers != null)
+            conflictVers.add(null);
+
+        if (conflictTtl >= 0) {
+            if (conflictTtls == null) {
+                conflictTtls = new GridLongList(keys.size());
+
+                for (int i = 0; i < keys.size() - 1; i++)
+                    conflictTtls.add(CU.TTL_NOT_CHANGED);
+            }
+
+            conflictTtls.add(conflictTtl);
+        }
+
+        if (conflictExpireTime >= 0) {
+            if (conflictExpireTimes == null) {
+                conflictExpireTimes = new GridLongList(keys.size());
+
+                for (int i = 0; i < keys.size() - 1; i++)
+                    conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+            }
+
+            conflictExpireTimes.add(conflictExpireTime);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<KeyCacheObject> keys() {
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return keys != null ? keys.size() : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        return keys.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> values() {
+        return op == TRANSFORM ? entryProcessors : vals;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public CacheObject value(int idx) {
+        assert op == UPDATE : op;
+
+        return vals.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+        assert op == TRANSFORM : op;
+
+        return entryProcessors.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject writeValue(int idx) {
+        if (vals != null)
+            return vals.get(idx);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public List<GridCacheVersion> conflictVersions() {
+        return conflictVers;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+        if (conflictVers != null) {
+            assert idx >= 0 && idx < conflictVers.size();
+
+            return conflictVers.get(idx);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictTtl(int idx) {
+        if (conflictTtls != null) {
+            assert idx >= 0 && idx < conflictTtls.size();
+
+            return conflictTtls.get(idx);
+        }
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
+        if (conflictExpireTimes != null) {
+            assert idx >= 0 && idx < conflictExpireTimes.size();
+
+            return conflictExpireTimes.get(idx);
+        }
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
+        return invokeArgs;
+    }
+
+    /**
+     * @return Flag indicating whether this is fast-map udpate.
+     */
+    @Override public boolean fastMap() {
+        return fastMap;
+    }
+
+    /**
+     * @return Topology locked flag.
+     */
+    @Override public boolean topologyLocked() {
+        return topLocked;
+    }
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    @Override public boolean clientRequest() {
+        return clientReq;
+    }
+
+    /**
+     * @return Return value flag.
+     */
+    @Override public boolean returnValue() {
+        return retval;
+    }
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Keep binary flag.
+     */
+    @Override public boolean keepBinary() {
+        return keepBinary;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasPrimary() {
+        return hasPrimary;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public CacheEntryPredicate[] filter() {
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExpiryPolicy expiry() {
+        return expiryPlc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObjects(keys, cctx);
+
+        if (filter != null) {
+            boolean hasFilter = false;
+
+            for (CacheEntryPredicate p : filter) {
+                if (p != null) {
+                    hasFilter = true;
+
+                    p.prepareMarshal(cctx);
+                }
+            }
+
+            if (!hasFilter)
+                filter = null;
+        }
+
+        if (expiryPlc != null && expiryPlcBytes == null)
+            expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
+        if (op == TRANSFORM) {
+            // force addition of deployment info for entry processors if P2P is enabled globally.
+            if (!addDepInfo && ctx.deploymentEnabled())
+                addDepInfo = true;
+
+            if (entryProcessorsBytes == null)
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+            if (invokeArgsBytes == null)
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+        }
+        else
+            prepareMarshalCacheObjects(vals, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+        if (op == TRANSFORM) {
+            if (entryProcessors == null)
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+            if (invokeArgs == null)
+                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+        }
+        else
+            finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+        if (filter != null) {
+            for (CacheEntryPredicate p : filter) {
+                if (p != null)
+                    p.finishUnmarshal(cctx, ldr);
+            }
+        }
+
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+        if (partIds != null && !partIds.isEmpty()) {
+            assert partIds.size() == keys.size();
+
+            for (int i = 0; i < keys.size(); i++)
+                keys.get(i).partition(partIds.get(i));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("clientReq", clientReq))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeBoolean("fastMap", fastMap))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeBoolean("keepBinary", keepBinary))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 17:
+                if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
+                if (!writer.writeBoolean("retval", retval))
+                    return false;
+
+                writer.incrementState();
+
+            case 19:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 21:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
+                if (!writer.writeBoolean("topLocked", topLocked))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 26:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                clientReq = reader.readBoolean("clientReq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                conflictTtls = reader.readMessage("conflictTtls");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                fastMap = reader.readBoolean("fastMap");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                hasPrimary = reader.readBoolean("hasPrimary");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                keepBinary = reader.readBoolean("keepBinary");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = GridCacheOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 17:
+                partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                retval = reader.readBoolean("retval");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 21:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 22:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
+                topLocked = reader.readBoolean("topLocked");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 24:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 25:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 26:
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanup(boolean clearKeys) {
+        vals = null;
+        entryProcessors = null;
+        entryProcessorsBytes = null;
+        invokeArgs = null;
+        invokeArgsBytes = null;
+
+        if (clearKeys)
+            keys = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 40;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter),
+            "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
new file mode 100644
index 0000000..e0c24b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingleUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Filter. */
+    private CacheEntryPredicate[] filter;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicSingleUpdateFilterRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param filter Optional filter for atomic check.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    GridNearAtomicSingleUpdateFilterRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable CacheEntryPredicate[] filter,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo
+    ) {
+        super(
+            cacheId,
+            nodeId,
+            futVer,
+            fastMap,
+            updateVer,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            clientReq,
+            addDepInfo
+        );
+
+        assert filter != null && filter.length > 0;
+
+        this.filter = filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheEntryPredicate[] filter() {
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        if (filter != null) {
+            boolean hasFilter = false;
+
+            for (CacheEntryPredicate p : filter) {
+                if (p != null) {
+                    hasFilter = true;
+
+                    p.prepareMarshal(cctx);
+                }
+            }
+
+            if (!hasFilter)
+                filter = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (filter != null) {
+            GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+            for (CacheEntryPredicate p : filter) {
+                if (p != null)
+                    p.finishUnmarshal(cctx, ldr);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 14:
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 14:
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicSingleUpdateFilterRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 127;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 15;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateFilterRequest.class, this, "filter", Arrays.toString(filter),
+            "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 256c7ac..eaf2f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
@@ -58,6 +59,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
+    /** */
+    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
+
     /** Keys */
     private Object key;
 
@@ -66,7 +70,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private Object val;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest req;
+    private GridNearAtomicAbstractUpdateRequest req;
 
     /**
      * @param cctx Cache context.
@@ -126,7 +130,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicUpdateResponse res = null;
 
-        GridNearAtomicUpdateRequest req;
+        GridNearAtomicAbstractUpdateRequest req;
 
         synchronized (mux) {
             req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null;
@@ -192,8 +196,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    @Override  public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicUpdateRequest req;
+    @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        GridNearAtomicAbstractUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
 
@@ -369,7 +373,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
         if (res.remapKeys() != null || !req.hasPrimary())
@@ -446,7 +450,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         Exception err = null;
-        GridNearAtomicUpdateRequest singleReq0 = null;
+        GridNearAtomicAbstractUpdateRequest singleReq0 = null;
 
         GridCacheVersion futVer = cctx.versions().next(topVer);
 
@@ -535,7 +539,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
         @Nullable GridCacheVersion updVer) throws Exception {
         if (key == null)
@@ -559,27 +563,94 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
-        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-            cctx.cacheId(),
-            primary.id(),
-            futVer,
-            false,
-            updVer,
-            topVer,
-            topLocked,
-            syncMode,
-            op,
-            retval,
-            expiryPlc,
-            invokeArgs,
-            filter,
-            subjId,
-            taskNameHash,
-            skipStore,
-            keepBinary,
-            cctx.kernalContext().clientNode(),
-            cctx.deploymentEnabled(),
-            1);
+        GridNearAtomicAbstractUpdateRequest req;
+
+        if (canUseSingleRequest(primary)) {
+            if (op == TRANSFORM) {
+                req = new GridNearAtomicSingleUpdateInvokeRequest(
+                    cctx.cacheId(),
+                    primary.id(),
+                    futVer,
+                    false,
+                    updVer,
+                    topVer,
+                    topLocked,
+                    syncMode,
+                    op,
+                    retval,
+                    invokeArgs,
+                    subjId,
+                    taskNameHash,
+                    skipStore,
+                    keepBinary,
+                    cctx.kernalContext().clientNode(),
+                    cctx.deploymentEnabled());
+            }
+            else {
+                if (filter == null || filter.length == 0) {
+                    req = new GridNearAtomicSingleUpdateRequest(
+                        cctx.cacheId(),
+                        primary.id(),
+                        futVer,
+                        false,
+                        updVer,
+                        topVer,
+                        topLocked,
+                        syncMode,
+                        op,
+                        retval,
+                        subjId,
+                        taskNameHash,
+                        skipStore,
+                        keepBinary,
+                        cctx.kernalContext().clientNode(),
+                        cctx.deploymentEnabled());
+                }
+                else {
+                    req = new GridNearAtomicSingleUpdateFilterRequest(
+                        cctx.cacheId(),
+                        primary.id(),
+                        futVer,
+                        false,
+                        updVer,
+                        topVer,
+                        topLocked,
+                        syncMode,
+                        op,
+                        retval,
+                        filter,
+                        subjId,
+                        taskNameHash,
+                        skipStore,
+                        keepBinary,
+                        cctx.kernalContext().clientNode(),
+                        cctx.deploymentEnabled());
+                }
+            }
+        }
+        else {
+            req = new GridNearAtomicFullUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                false,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+        }
 
         req.addUpdateEntry(cacheKey,
             val,
@@ -591,6 +662,16 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         return req;
     }
 
+    /**
+     * @param node Target node
+     * @return {@code True} can use 'single' update requests.
+     */
+    private boolean canUseSingleRequest(ClusterNode node) {
+        assert node != null;
+
+        return expiryPlc == null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+    }
+
     /** {@inheritDoc} */
     public String toString() {
         synchronized (mux) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
new file mode 100644
index 0000000..42b51d6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ *
+ */
+public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingleUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Entry processor arguments bytes. */
+    private byte[][] invokeArgsBytes;
+
+    /** Entry processors. */
+    @GridDirectTransient
+    private EntryProcessor<Object, Object, Object> entryProcessor;
+
+    /** Entry processors bytes. */
+    private byte[] entryProcessorBytes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicSingleUpdateInvokeRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    GridNearAtomicSingleUpdateInvokeRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable Object[] invokeArgs,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo
+    ) {
+        super(
+            cacheId,
+            nodeId,
+            futVer,
+            fastMap,
+            updateVer,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            clientReq,
+            addDepInfo
+        );
+        this.invokeArgs = invokeArgs;
+
+        assert op == TRANSFORM : op;
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Optional update value.
+     * @param conflictTtl Conflict TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param primary If given key is primary on this mapping.
+     */
+    @Override public void addUpdateEntry(KeyCacheObject key,
+        @Nullable Object val,
+        long conflictTtl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
+        assert conflictTtl < 0 : conflictTtl;
+        assert conflictExpireTime < 0 : conflictExpireTime;
+        assert conflictVer == null : conflictVer;
+        assert val instanceof EntryProcessor : val;
+
+        entryProcessor = (EntryProcessor<Object, Object, Object>)val;
+
+        this.key = key;
+        partId = key.partition();
+
+        hasPrimary(hasPrimary() | primary);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> values() {
+        return Collections.singletonList(entryProcessor);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject value(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return entryProcessor;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject writeValue(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object[] invokeArguments() {
+        return invokeArgs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        // force addition of deployment info for entry processors if P2P is enabled globally.
+        if (!addDepInfo && ctx.deploymentEnabled())
+            addDepInfo = true;
+
+        if (entryProcessor != null && entryProcessorBytes == null)
+            entryProcessorBytes = CU.marshal(cctx, entryProcessor);
+
+        if (invokeArgsBytes == null)
+            invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (entryProcessorBytes != null && entryProcessor == null)
+            entryProcessor = U.unmarshal(ctx, entryProcessorBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+        if (invokeArgs == null)
+            invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanup(boolean clearKey) {
+        super.cleanup(clearKey);
+
+        entryProcessor = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 14:
+                if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 14:
+                entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicSingleUpdateInvokeRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 16;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 126;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..211b472
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ *
+ */
+public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSingleUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Key to update. */
+    @GridToStringInclude
+    protected KeyCacheObject key;
+
+    /** Value to update. */
+    protected CacheObject val;
+
+    /** Partition of key. */
+    protected int partId;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicSingleUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    GridNearAtomicSingleUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo
+    ) {
+        super(
+            cacheId,
+            nodeId,
+            futVer,
+            fastMap,
+            updateVer,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            clientReq,
+            addDepInfo
+        );
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Optional update value.
+     * @param conflictTtl Conflict TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param primary If given key is primary on this mapping.
+     */
+    @Override public void addUpdateEntry(KeyCacheObject key,
+        @Nullable Object val,
+        long conflictTtl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
+        assert op != TRANSFORM;
+        assert val != null || op == DELETE;
+        assert conflictTtl < 0 : conflictTtl;
+        assert conflictExpireTime < 0 : conflictExpireTime;
+        assert conflictVer == null : conflictVer;
+
+        this.key = key;
+        partId = key.partition();
+
+        if (val != null) {
+            assert val instanceof CacheObject : val;
+
+            this.val = (CacheObject)val;
+        }
+
+        hasPrimary(hasPrimary() | primary);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return key == null ? 0 : 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<KeyCacheObject> keys() {
+        return Collections.singletonList(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        assert idx == 0 : idx;
+
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> values() {
+        return Collections.singletonList(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject value(int idx) {
+        assert idx == 0 : idx;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject writeValue(int idx) {
+        assert idx == 0 : idx;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public List<GridCacheVersion> conflictVersions() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridCacheVersion conflictVersion(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictTtl(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @param ctx
+     */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObject(key, cctx);
+
+        if (val != null)
+            prepareMarshalCacheObject(val, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+
+        if (val != null)
+            val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+
+        key.partition(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 11:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 11:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                val = reader.readMessage("val");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicSingleUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanup(boolean clearKey) {
+        val = null;
+
+        if (clearKey)
+            key = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 125;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 14;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 30a0c3d..cd64117 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -87,13 +87,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** Mappings if operations is mapped to more than one node. */
     @GridToStringInclude
-    private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+    private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
 
     /** Keys to remap. */
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest singleReq;
+    private GridNearAtomicFullUpdateRequest singleReq;
 
     /**
      * @param cctx Cache context.
@@ -164,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicUpdateResponse res = null;
 
-        GridNearAtomicUpdateRequest req;
+        GridNearAtomicFullUpdateRequest req;
 
         synchronized (mux) {
             if (singleReq != null)
@@ -258,7 +258,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
     @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicUpdateRequest req;
+        GridNearAtomicFullUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
 
@@ -406,7 +406,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
-                for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+                for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
                     GridNearAtomicUpdateResponse res0 = req0.response();
 
                     assert res0 != null : req0;
@@ -482,7 +482,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
         if (res.remapKeys() != null || !req.hasPrimary())
@@ -552,13 +552,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicUpdateRequest locUpdate = null;
+        GridNearAtomicFullUpdateRequest locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+        for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
             if (locNodeId.equals(req.nodeId())) {
                 assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
                     ", req=" + req + ']';
@@ -590,8 +590,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -621,8 +621,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         Exception err = null;
-        GridNearAtomicUpdateRequest singleReq0 = null;
-        Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+        GridNearAtomicFullUpdateRequest singleReq0 = null;
+        Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
 
         int size = keys.size();
 
@@ -651,7 +651,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
             }
             else {
-                Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futVer,
                     updVer,
@@ -663,7 +663,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (syncMode == PRIMARY_SYNC) {
                         mappings0 = U.newHashMap(pendingMappings.size());
 
-                        for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                        for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
                             if (req.hasPrimary())
                                 mappings0.put(req.nodeId(), req);
                         }
@@ -756,7 +756,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @throws Exception If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+    private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
         @Nullable GridCacheVersion updVer,
@@ -776,7 +776,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+        Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
 
         // Create mappings first, then send messages.
         for (Object key : keys) {
@@ -848,10 +848,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 UUID nodeId = affNode.id();
 
-                GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+                GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
 
                 if (mapped == null) {
-                    mapped = new GridNearAtomicUpdateRequest(
+                    mapped = new GridNearAtomicFullUpdateRequest(
                         cctx.cacheId(),
                         nodeId,
                         futVer,
@@ -892,7 +892,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+    private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
         @Nullable GridCacheVersion updVer) throws Exception {
         Object key = F.first(keys);
@@ -953,7 +953,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
-        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+        GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
             primary.id(),
             futVer,


Mime
View raw message