ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [40/50] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Mon, 13 Mar 2017 16:35:32 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index c92e0f5..39abb73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -17,12 +17,22 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
@@ -31,16 +41,19 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * Base for near atomic update futures.
@@ -108,28 +121,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     protected boolean topLocked;
 
     /** Remap count. */
+    @GridToStringInclude
     protected int remapCnt;
 
     /** Current topology version. */
+    @GridToStringInclude
     protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
     /** */
-    protected GridCacheVersion updVer;
-
-    /** Topology version when got mapping error. */
-    protected AffinityTopologyVersion mapErrTopVer;
-
-    /** */
-    protected int resCnt;
+    @GridToStringInclude
+    protected AffinityTopologyVersion remapTopVer;
 
     /** Error. */
+    @GridToStringInclude
     protected CachePartialUpdateCheckedException err;
 
     /** Future ID. */
-    protected GridCacheVersion futVer;
-
-    /** Completion future for a particular topology version. */
-    protected GridFutureAdapter<Void> topCompleteFut;
+    @GridToStringInclude
+    protected Long futId;
 
     /** Operation result. */
     protected GridCacheReturn opRes;
@@ -198,10 +207,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         this.remapCnt = remapCnt;
     }
 
+    /** {@inheritDoc} */
+    @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        return null;
+    }
+
+    /**
+     * @param req Request.
+     */
+    void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+        try {
+            cctx.io().send(req.updateRequest().nodeId(), req, cctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e) {
+            onSendError(req, e);
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+        }
+    }
+
     /**
      * Performs future mapping.
      */
-    public void map() {
+    public final void map() {
         AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
 
         if (topVer == null)
@@ -212,18 +241,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            GridCacheVersion futVer = addAtomicFuture(topVer);
-
-            if (futVer != null)
-                map(topVer, futVer);
+            map(topVer);
         }
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version
      */
-    protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer);
+    protected abstract void map(AffinityTopologyVersion topVer);
 
     /**
      * Maps future on ready topology.
@@ -248,8 +273,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /**
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
-    protected boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    final boolean storeFuture() {
+        return syncMode != FULL_ASYNC;
     }
 
     /**
@@ -258,12 +283,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+    final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+                new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
+                        if (syncMode != FULL_ASYNC)
+                            onPrimaryResponse(res.nodeId(), res, false);
+                        else if (res.remapTopologyVersion() != null)
+                            ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req);
                     }
                 });
         }
@@ -272,18 +300,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
-                        ", writeVer=" + req.updateVersion() +
+                    msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
                         ", node=" + req.nodeId() + ']');
                 }
-
-                if (syncMode == FULL_ASYNC)
-                    onDone(new GridCacheReturn(cctx, true, true, null, true));
             }
             catch (IgniteCheckedException e) {
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
-                        ", writeVer=" + req.updateVersion() +
+                    msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
                         ", node=" + req.nodeId() +
                         ", err=" + e + ']');
                 }
@@ -300,46 +323,377 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param res Update response.
      * @param nodeErr {@code True} if response was created on node failure.
      */
-    public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+    public abstract void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
 
     /**
      * @param req Request.
-     * @param e Error.
+     * @param res Response.
      */
-    protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
-        synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                req.nodeId(),
-                req.futureVersion(),
-                cctx.deploymentEnabled());
+    final void onPrimaryError(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        assert res.error() != null;
+
+        if (err == null)
+            err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+        Collection<KeyCacheObject> keys0 = res.failedKeys() != null ? res.failedKeys() : req.keys();
+
+        Collection<Object> keys = new ArrayList<>(keys0.size());
 
-            res.addFailedKeys(req.keys(), e);
+        for (KeyCacheObject key : keys0)
+            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
 
-            onResult(req.nodeId(), res, true);
+        err.add(keys, res.error(), req.topologyVersion());
+    }
+
+    /**
+     * @param req Request.
+     * @return Response to notify about primary failure.
+     */
+    final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
+        assert req.response() == null : req;
+        assert req.nodeId() != null : req;
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
+                ", node=" + req.nodeId() + ']');
         }
+
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.nodeId(),
+            req.futureId(),
+            req.partition(),
+            true,
+            cctx.deploymentEnabled());
+
+        ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+            "before response is received: " + req.nodeId());
+
+        e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+        res.addFailedKeys(req.keys(), e);
+
+        return res;
+    }
+
+    /**
+     * @param req Request.
+     * @param e Error.
+     */
+    final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.nodeId(),
+            req.futureId(),
+            req.partition(),
+            e instanceof ClusterTopologyCheckedException,
+            cctx.deploymentEnabled());
+
+        res.addFailedKeys(req.keys(), e);
+
+        onPrimaryResponse(req.nodeId(), res, true);
+    }
+
+    /**
+     * @param req Request.
+     * @param e Error.
+     */
+    private void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.updateRequest().nodeId(),
+            req.futureId(),
+            req.partition(),
+            e instanceof ClusterTopologyCheckedException,
+            cctx.deploymentEnabled());
+
+        res.addFailedKeys(req.updateRequest().keys(), e);
+
+        onPrimaryResponse(req.updateRequest().nodeId(), res, true);
     }
 
     /**
-     * Adds future prevents topology change before operation complete.
-     * Should be invoked before topology lock released.
      *
-     * @param topVer Topology version.
-     * @return Future version in case future added.
      */
-    protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) {
-        GridCacheVersion futVer = cctx.versions().next(topVer);
+    static class PrimaryRequestState {
+        /** */
+        final GridNearAtomicAbstractUpdateRequest req;
+
+        /** */
+        @GridToStringInclude
+        Set<UUID> dhtNodes;
+
+        /** */
+        @GridToStringInclude
+        private Set<UUID> rcvd;
+
+        /** */
+        private boolean hasRes;
+
+        /**
+         * @param req Request.
+         * @param nodes Affinity nodes.
+         * @param single {@code True} if created for sigle-key operation.
+         */
+        PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req, List<ClusterNode> nodes, boolean single) {
+            assert req != null && req.nodeId() != null : req;
+
+            this.req = req;
+
+            if (req.initMappingLocally()) {
+                if (single) {
+                    if (nodes.size() > 1) {
+                        dhtNodes = U.newHashSet(nodes.size() - 1);
+
+                        for (int i = 1; i < nodes.size(); i++)
+                            dhtNodes.add(nodes.get(i).id());
+                    }
+                    else
+                        dhtNodes = Collections.emptySet();
+                }
+                else {
+                    dhtNodes = new HashSet<>();
+
+                    for (int i = 1; i < nodes.size(); i++)
+                        dhtNodes.add(nodes.get(i).id());
+                }
+            }
+        }
+
+        /**
+         * @return Primary node ID.
+         */
+        UUID primaryId() {
+            return req.nodeId();
+        }
+
+        /**
+         * @param nodes Nodes.
+         */
+        void addMapping(List<ClusterNode> nodes) {
+            assert req.initMappingLocally();
+
+            for (int i = 1; i < nodes.size(); i++)
+                dhtNodes.add(nodes.get(i).id());
+        }
+
+        /**
+         * @param cctx Context.
+         * @return Check result.
+         */
+        DhtLeftResult checkDhtNodes(GridCacheContext cctx) {
+            assert req.initMappingLocally() : req;
 
-        synchronized (mux) {
-            assert this.futVer == null : this;
-            assert this.topVer == AffinityTopologyVersion.ZERO : this;
+            if (finished())
+                return DhtLeftResult.NOT_DONE;
 
-            this.topVer = topVer;
-            this.futVer = futVer;
+            boolean finished = false;
+
+            for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) {
+                UUID nodeId = it.next();
+
+                if (!cctx.discovery().alive(nodeId)) {
+                    it.remove();
+
+                    if (finished()) {
+                        finished = true;
+
+                        break;
+                    }
+                }
+            }
+
+            if (finished)
+                return DhtLeftResult.DONE;
+
+            if (dhtNodes.isEmpty())
+                return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+
+            return DhtLeftResult.NOT_DONE;
+        }
+
+        /**
+         * @return {@code True} if all expected responses are received.
+         */
+        private boolean finished() {
+            if (req.writeSynchronizationMode() == PRIMARY_SYNC)
+                return hasRes;
+
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+        }
+
+        /**
+         * @return Request if need process primary fail response, {@code null} otherwise.
+         */
+        @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() {
+            if (finished())
+                return null;
+
+            /*
+             * When primary failed, even if primary response is received, it is possible it failed to send
+             * request to backup(s), need remap operation.
+             */
+            if (req.fullSync() && !req.nodeFailedResponse()) {
+                req.resetResponse();
+
+                return req;
+            }
+
+            return req.response() == null ? req : null;
         }
 
-        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this))
-            return null;
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @return Request if need process primary response, {@code null} otherwise.
+         */
+        @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+            assert req.nodeId().equals(nodeId);
 
-        return futVer;
+            if (res.nodeLeftResponse())
+                return onPrimaryFail();
+
+            if (finished())
+                return null;
+
+            return req.response() == null ? req : null;
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return {@code True} if request processing finished.
+         */
+        DhtLeftResult onDhtNodeLeft(UUID nodeId) {
+            if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == null || finished())
+                return DhtLeftResult.NOT_DONE;
+
+            if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
+                if (hasRes)
+                    return DhtLeftResult.DONE;
+                else
+                    return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+            }
+
+            return DhtLeftResult.NOT_DONE;
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+            assert req.writeSynchronizationMode() == FULL_SYNC : req;
+
+            if (finished())
+                return false;
+
+            if (res.hasResult())
+                hasRes = true;
+
+            if (dhtNodes == null) {
+                if (rcvd == null)
+                    rcvd = new HashSet<>();
+
+                rcvd.add(nodeId);
+
+                return false;
+            }
+
+            return dhtNodes.remove(nodeId) && finished();
+        }
+
+        /**
+         * @param res Response.
+         * @param cctx Cache context.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
+            assert !finished() : this;
+
+            hasRes = true;
+
+            boolean onRes = req.onResponse(res);
+
+            assert onRes;
+
+            if (res.error() != null || res.remapTopologyVersion() != null) {
+                dhtNodes = Collections.emptySet(); // Mark as finished.
+
+                return true;
+            }
+
+            assert res.returnValue() != null : res;
+
+            if (res.dhtNodes() != null)
+                initDhtNodes(res.dhtNodes(), cctx);
+
+            return finished();
+        }
+
+        /**
+         * @param nodeIds Node IDs.
+         * @param cctx Context.
+         */
+        private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
+            assert dhtNodes == null || req.initMappingLocally();
+
+            Set<UUID> dhtNodes0 = dhtNodes;
+
+            dhtNodes = null;
+
+            for (UUID dhtNodeId : nodeIds) {
+                if (F.contains(rcvd, dhtNodeId))
+                    continue;
+
+                if (req.initMappingLocally() && !F.contains(dhtNodes0, dhtNodeId))
+                    continue;
+
+                if (cctx.discovery().node(dhtNodeId) != null) {
+                    if (dhtNodes == null)
+                        dhtNodes = U.newHashSet(nodeIds.size());
+
+                    dhtNodes.add(dhtNodeId);
+                }
+            }
+
+            if (dhtNodes == null)
+                dhtNodes = Collections.emptySet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PrimaryRequestState.class, this,
+                "primary", primaryId(),
+                "needPrimaryRes", req.needPrimaryResponse(),
+                "primaryRes", req.response() != null,
+                "done", finished());
+        }
+    }
+
+    /**
+     *
+     */
+    enum DhtLeftResult {
+        /** All responses and operation result are received. */
+        DONE,
+
+        /** Not all responses are received. */
+        NOT_DONE,
+
+        /**
+         * All backups failed and response from primary is not required,
+         * in this case in FULL_SYNC mode need send additional request
+         * on primary to ensure FULL_SYNC guarantee.
+         */
+        ALL_RCVD_CHECK_PRIMARY
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicAbstractUpdateFuture.class, this, super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..a43bfb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -17,18 +17,28 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,106 +48,331 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
+    /** . */
+    private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    private static final int TOP_LOCKED_FLAG_MASK = 0x02;
+
+    /** Skip write-through to a persistent storage. */
+    private static final int SKIP_STORE_FLAG_MASK = 0x04;
+
+    /** Keep binary flag. */
+    private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+
+    /** Return value flag. */
+    private static final int RET_VAL_FLAG_MASK = 0x10;
+
+    /** Target node ID. */
+    @GridDirectTransient
+    protected UUID nodeId;
+
+    /** Future version. */
+    protected long futId;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    protected GridCacheOperation op;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */
+    @GridToStringExclude
+    protected byte flags;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
     /**
-     * @return Mapped node ID.
+     *
      */
-    public abstract UUID nodeId();
+    public GridNearAtomicAbstractUpdateRequest() {
+        // No-op.
+    }
 
     /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
+     * @param futId Future ID.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param needPrimaryRes {@code True} if near node waits for primary response.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    protected GridNearAtomicAbstractUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        long futId,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean needPrimaryRes,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean addDepInfo
+    ) {
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futId = futId;
+        this.topVer = topVer;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (needPrimaryRes)
+            needPrimaryResponse(true);
+        if (topLocked)
+            topologyLocked(true);
+        if (retval)
+            returnValue(true);
+        if (skipStore)
+            skipStore(true);
+        if (keepBinary)
+            keepBinary(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    /**
+     * @return {@code True} if near node is able to initialize update mapping locally.
+     */
+    boolean initMappingLocally() {
+        return !needPrimaryResponse() && fullSync();
+    }
+
+    /**
+     * @return {@code True} if near node waits for primary response.
+     */
+    boolean needPrimaryResponse() {
+        return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
+    }
+
+    /**
+     * @param needRes {@code True} if near node waits for primary response.
+     */
+    void needPrimaryResponse(boolean needRes) {
+        setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode.
+     */
+    boolean fullSync() {
+        assert syncMode != null;
+
+        return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+    }
+
+    /**
+     * @return Task name hash code.
+     */
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Update opreation.
      */
-    public abstract void nodeId(UUID nodeId);
+    public GridCacheOperation operation() {
+        return op;
+    }
 
     /**
      * @return Subject ID.
      */
-    public abstract UUID subjectId();
+    public UUID subjectId() {
+        return subjId;
+    }
 
     /**
-     * @return Task name hash.
+     * @return Target node ID.
      */
-    public abstract int taskNameHash();
+    public UUID nodeId() {
+        return nodeId;
+    }
 
     /**
-     * @return Future version.
+     * @return Near node future ID.
      */
-    public abstract GridCacheVersion futureVersion();
+    public long futureId() {
+        return futId;
+    }
 
     /**
-     * @return Flag indicating whether this is fast-map udpate.
+     * @return Write synchronization mode.
      */
-    public abstract boolean fastMap();
+    public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
 
     /**
-     * @return Update version for fast-map request.
+     * @param res Response.
+     * @return {@code True} if current response was {@code null}.
      */
-    public abstract GridCacheVersion updateVersion();
+    public boolean onResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null) {
+            this.res = res;
+
+            return true;
+        }
+
+        return false;
+    }
 
     /**
-     * @return Topology locked flag.
+     *
      */
-    public abstract boolean topologyLocked();
+    void resetResponse() {
+        this.res = null;
+    }
 
     /**
-     * @return {@code True} if request sent from client node.
+     * @return Response.
      */
-    public abstract boolean clientRequest();
+    @Nullable public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
 
     /**
-     * @return Cache write synchronization mode.
+     * @return {@code True} if received notification about primary fail.
      */
-    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+    boolean nodeFailedResponse() {
+        return res != null && res.nodeLeftResponse();
+    }
 
     /**
-     * @return Expiry policy.
+     * @return Topology locked flag.
      */
-    public abstract ExpiryPolicy expiry();
+    final boolean topologyLocked() {
+        return isFlag(TOP_LOCKED_FLAG_MASK);
+    }
+
+    /**
+     * @param val {@code True} if topology is locked on near node.
+     */
+    private void topologyLocked(boolean val) {
+        setFlag(val, TOP_LOCKED_FLAG_MASK);
+    }
 
     /**
      * @return Return value flag.
      */
-    public abstract boolean returnValue();
+    public final boolean returnValue() {
+        return isFlag(RET_VAL_FLAG_MASK);
+    }
 
     /**
-     * @return Filter.
+     * @param val Return value flag.
      */
-    @Nullable public abstract CacheEntryPredicate[] filter();
+    public final void returnValue(boolean val) {
+        setFlag(val, RET_VAL_FLAG_MASK);
+    }
 
     /**
      * @return Skip write-through to a persistent storage.
      */
-    public abstract boolean skipStore();
+    public final boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /**
+     * @param val Skip store flag.
+     */
+    public void skipStore(boolean val) {
+        setFlag(val, SKIP_STORE_FLAG_MASK);
+    }
 
     /**
      * @return Keep binary flag.
      */
-    public abstract boolean keepBinary();
+    public final boolean keepBinary() {
+        return isFlag(KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
-     * @return Update operation.
+     * @param val Keep binary flag.
      */
-    public abstract GridCacheOperation operation();
+    public void keepBinary(boolean val) {
+        setFlag(val, KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
-     * @return Optional arguments for entry processor.
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
      */
-    @Nullable public abstract Object[] invokeArguments();
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
 
     /**
-     * @return Flag indicating whether this request contains primary keys.
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
      */
-    public abstract boolean hasPrimary();
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
 
     /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
+     * @return Expiry policy.
      */
-    public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+    public abstract ExpiryPolicy expiry();
 
     /**
-     * @return Response.
+     * @return Filter.
      */
-    @Nullable public abstract GridNearAtomicUpdateResponse response();
+    @Nullable public abstract CacheEntryPredicate[] filter();
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public abstract Object[] invokeArguments();
 
     /**
      * @param key Key to add.
@@ -145,14 +380,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
-    public abstract void addUpdateEntry(KeyCacheObject key,
+    abstract void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary);
+        @Nullable GridCacheVersion conflictVer);
 
     /**
      * @return Keys for this update request.
@@ -182,7 +415,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      */
     public abstract CacheObject writeValue(int idx);
 
-
     /**
      * @return Conflict versions.
      */
@@ -223,4 +455,170 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @return Key.
      */
     public abstract KeyCacheObject key(int idx);
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = GridCacheOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 6:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 8:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (needPrimaryResponse())
+            appendFlag(flags, "needRes");
+        if (topologyLocked())
+            appendFlag(flags, "topLock");
+        if (skipStore())
+            appendFlag(flags, "skipStore");
+        if (keepBinary())
+            appendFlag(flags, "keepBinary");
+        if (returnValue())
+            appendFlag(flags, "retVal");
+
+        return S.toString(GridNearAtomicAbstractUpdateRequest.class, this,
+            "flags", flags.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
new file mode 100644
index 0000000..4d0726a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicAbstractUpdateRequest updateReq;
+
+    /** */
+    private int partId;
+
+    /** */
+    private long futId;
+
+    /**
+     *
+     */
+    public GridNearAtomicCheckUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param updateReq Related update request.
+     */
+    GridNearAtomicCheckUpdateRequest(GridNearAtomicAbstractUpdateRequest updateReq) {
+        assert updateReq != null && updateReq.fullSync() : updateReq;
+
+        this.updateReq = updateReq;
+        this.cacheId = updateReq.cacheId();
+        this.partId = updateReq.partition();
+        this.futId = updateReq.futureId();
+
+        assert partId >= 0;
+    }
+
+    /**
+     * @return Future ID on near node.
+     */
+    public final long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Related update request.
+     */
+    GridNearAtomicAbstractUpdateRequest updateRequest() {
+        return updateReq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -47;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicCheckUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicCheckUpdateRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08c2474..c381333 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -41,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,56 +61,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Target node ID. */
-    @GridDirectTransient
-    private UUID nodeId;
-
-    /** Future version. */
-    private GridCacheVersion futVer;
-
-    /** Update version. Set to non-null if fastMap is {@code true}. */
-    private GridCacheVersion updateVer;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
-    /** Write synchronization mode. */
-    private CacheWriteSynchronizationMode syncMode;
-
-    /** Update operation. */
-    private GridCacheOperation op;
-
-    /** Subject ID. */
-    protected UUID subjId;
-
-    /** Task name hash. */
-    protected int taskNameHash;
-
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
-    /** Fast map flag. */
-    protected boolean fastMap;
-
-    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
-    protected boolean topLocked;
-
-    /** Flag indicating whether request contains primary keys. */
-    protected boolean hasPrimary;
-
-    /** Skip write-through to a persistent storage. */
-    protected boolean skipStore;
-
-    /** */
-    protected boolean clientReq;
-
-    /** Keep binary flag. */
-    protected boolean keepBinary;
-
-    /** Return value flag. */
-    protected boolean retval;
-
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -120,10 +70,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
-    /** Partitions of keys. */
-    @GridDirectCollection(int.class)
-    private List<Integer> partIds;
-
     /** Entry processors. */
     @GridDirectTransient
     private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -175,9 +121,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
+     * @param futId Future ID.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -190,16 +134,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      * @param maxEntryCnt Maximum entries count.
      */
     GridNearAtomicFullUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
+        long futId,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -210,34 +151,29 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo,
         int maxEntryCnt
     ) {
-        assert futVer != null;
-
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
-        this.futVer = futVer;
-        this.fastMap = fastMap;
-        this.updateVer = updateVer;
-
-        this.topVer = topVer;
-        this.topLocked = topLocked;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.retval = retval;
+        super(cacheId,
+            nodeId,
+            futId,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            needPrimaryRes,
+            skipStore,
+            keepBinary,
+            addDepInfo);
         this.expiryPlc = expiryPlc;
         this.invokeArgs = invokeArgs;
         this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.keepBinary = keepBinary;
-        this.clientReq = clientReq;
-        this.addDepInfo = addDepInfo;
 
         // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
         // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
@@ -246,84 +182,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         initSize = Math.min(maxEntryCnt, 10);
 
         keys = new ArrayList<>(initSize);
-
-        partIds = new ArrayList<>(initSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeId() {
-        return nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID subjectId() {
-        return subjId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion futureVersion() {
-        return futVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion updateVersion() {
-        return updateVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheOperation operation() {
-        return op;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null) {
-            this.res = res;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.atomicMessageLogger();
     }
 
     /** {@inheritDoc} */
@@ -331,8 +189,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
         if (op == TRANSFORM) {
@@ -344,7 +201,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         assert val != null || op == DELETE;
 
         keys.add(key);
-        partIds.add(key.partition());
 
         if (entryProcessor != null) {
             if (entryProcessors == null)
@@ -361,8 +217,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             vals.add((CacheObject)val);
         }
 
-        hasPrimary |= primary;
-
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -407,6 +261,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int size() {
+        assert keys != null;
+
         return keys != null ? keys.size() : 0;
     }
 
@@ -488,41 +344,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public boolean fastMap() {
-        return fastMap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean topologyLocked() {
-        return topLocked;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean clientRequest() {
-        return clientReq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean returnValue() {
-        return retval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasPrimary() {
-        return hasPrimary;
-    }
-
-    /** {@inheritDoc} */
     @Override @Nullable public CacheEntryPredicate[] filter() {
         return filter;
     }
@@ -600,18 +421,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         }
         else
             finishUnmarshalCacheObjects(vals, cctx, ldr);
-
-        if (partIds != null && !partIds.isEmpty()) {
-            assert partIds.size() == keys.size();
-
-            for (int i = 0; i < keys.size(); i++)
-                keys.get(i).partition(partIds.get(i));
-        }
     }
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+        assert !F.isEmpty(keys);
+
+        return keys.get(0).partition();
     }
 
     /** {@inheritDoc} */
@@ -629,145 +445,55 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         }
 
         switch (writer.state()) {
-            case 3:
-                if (!writer.writeBoolean("clientReq", clientReq))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeMessage("conflictTtls", conflictTtls))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeBoolean("fastMap", fastMap))
-                    return false;
-
-                writer.incrementState();
-
             case 10:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeBoolean("retval", retval))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
-                if (!writer.writeBoolean("skipStore", skipStore))
-                    return false;
-
-                writer.incrementState();
-
-            case 20:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 21:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 22:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 23:
-                if (!writer.writeBoolean("topLocked", topLocked))
-                    return false;
-
-                writer.incrementState();
-
-            case 24:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 25:
-                if (!writer.writeMessage("updateVer", updateVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 26:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -789,64 +515,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             return false;
 
         switch (reader.state()) {
-            case 3:
-                clientReq = reader.readBoolean("clientReq");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                conflictTtls = reader.readMessage("conflictTtls");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                fastMap = reader.readBoolean("fastMap");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 10:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -854,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 11:
-                futVer = reader.readMessage("futVer");
+                conflictTtls = reader.readMessage("conflictTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -862,7 +532,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 12:
-                hasPrimary = reader.readBoolean("hasPrimary");
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -870,7 +540,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 13:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -878,7 +548,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 14:
-                keepBinary = reader.readBoolean("keepBinary");
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -886,7 +556,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 15:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -894,19 +564,15 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 16:
-                byte opOrd;
-
-                opOrd = reader.readByte("op");
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
 
-                op = GridCacheOperation.fromOrdinal(opOrd);
-
                 reader.incrementState();
 
             case 17:
-                partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -914,74 +580,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 18:
-                retval = reader.readBoolean("retval");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 19:
-                skipStore = reader.readBoolean("skipStore");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 20:
-                subjId = reader.readUuid("subjId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 21:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
-                reader.incrementState();
-
-            case 22:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 23:
-                topLocked = reader.readBoolean("topLocked");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 24:
-                topVer = reader.readMessage("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 25:
-                updateVer = reader.readMessage("updateVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 26:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -1013,12 +611,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 27;
+        return 19;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter),
+        return S.toString(GridNearAtomicFullUpdateRequest.class, this,
+            "filter", Arrays.toString(filter),
             "parent", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index e0c24b2..78582b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -58,9 +58,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
+     * @param futId Future ID.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -71,15 +69,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateFilterRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
+        long futId,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -88,17 +83,15 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
         super(
             cacheId,
             nodeId,
-            futVer,
-            fastMap,
-            updateVer,
+            futId,
             topVer,
             topLocked,
             syncMode,
@@ -106,9 +99,9 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            needPrimaryRes,
             skipStore,
             keepBinary,
-            clientReq,
             addDepInfo
         );
 
@@ -173,7 +166,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 14:
+            case 12:
                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
@@ -195,7 +188,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 14:
+            case 12:
                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
@@ -215,7 +208,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 13;
     }
 
     /** {@inheritDoc} */


Mime
View raw message