ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: Single update POC for atomic cache.
Date Tue, 10 Nov 2015 12:45:19 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1843 77a3f64f1 -> 1d3108603


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..d1299dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -0,0 +1,1093 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache near update future.
+ */
+public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object>{
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Cache. */
+    private GridDhtAtomicCache cache;
+
+    /** Update operation. */
+    private final GridCacheOperation op;
+
+    /** Keys */
+    private Object key;
+
+    /** Values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Object val;
+
+    /** Optional arguments for entry processor. */
+    private Object[] invokeArgs;
+
+    /** Return value require flag. */
+    private final boolean retval;
+
+    /** Expiry policy. */
+    private final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    private final CacheEntryPredicate[] filter;
+
+    /** Write synchronization mode. */
+    private final CacheWriteSynchronizationMode syncMode;
+
+    /** Raw return value flag. */
+    private final boolean rawRetval;
+
+    /** Fast map flag. */
+    private final boolean fastMap;
+
+    /** Near cache flag. */
+    private final boolean nearEnabled;
+
+    /** Subject ID. */
+    private final UUID subjId;
+
+    /** Task name hash. */
+    private final int taskNameHash;
+
+    /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+    private boolean topLocked;
+
+    /** Skip store flag. */
+    private final boolean skipStore;
+
+    /** Wait for topology future flag. */
+    private final boolean waitTopFut;
+
+    /** Remap count. */
+    private int remapCnt;
+
+    /** State. */
+    private final UpdateState state;
+
+    /**
+     * @param cctx Cache context.
+     * @param cache Cache instance.
+     * @param syncMode Write synchronization mode.
+     * @param op Update operation.
+     * @param key Key to update.
+     * @param val Value or transform closure.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param retval Return value require flag.
+     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+     * @param expiryPlc Expiry policy explicitly specified for cache operation.
+     * @param filter Entry filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip store flag.
+     * @param remapCnt Maximum number of retries.
+     * @param waitTopFut If {@code false} does not wait for affinity change future.
+     */
+    public GridNearAtomicSingleUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        Object key,
+        Object val,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        final CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        int remapCnt,
+        boolean waitTopFut
+    ) {
+        this.rawRetval = rawRetval;
+
+        assert subjId != null;
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.key = key;
+        this.val = val;
+        this.invokeArgs = invokeArgs;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.waitTopFut = waitTopFut;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
+
+        if (!waitTopFut)
+            remapCnt = 1;
+
+        this.remapCnt = remapCnt;
+
+        state = new UpdateState();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return state.futureVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    private boolean waitForPartitionExchange() {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        return fastMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<?> keys() {
+        return Collections.singleton(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        state.onNodeLeft(nodeId);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    @Override
+    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void onResult(UUID nodeId) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = null;
+
+        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
+
+        if (tx != null && tx.topologyVersionSnapshot() != null)
+            topVer = tx.topologyVersionSnapshot();
+
+        if (topVer == null)
+            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
+
+            // Cannot remap.
+            remapCnt = 1;
+
+            state.map(topVer);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForPartitionExchange()) {
+            GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+
+            if (fut != null && isDone()) {
+                fut.onDone();
+
+                return null;
+            }
+
+            return fut;
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+        assert res == null || res instanceof GridCacheReturn;
+
+        GridCacheReturn ret = (GridCacheReturn)res;
+
+        Object retval =
+            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
+
+        if (op == TRANSFORM && retval == null)
+            retval = Collections.emptyMap();
+
+        if (super.onDone(retval, err)) {
+            GridCacheVersion futVer = state.onFutureDone();
+
+            if (futVer != null)
+                cctx.mvcc().removeAtomicFuture(futVer);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Response callback.
+     *
+     * @param nodeId Node ID.
+     * @param res Update response.
+     */
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        state.onResult(nodeId, res, false);
+    }
+
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        if (!nearEnabled || !req.hasPrimary())
+            return;
+
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+        near.processNearAtomicUpdateResponse(req, res);
+    }
+
+    /**
+     * Maps future on ready topology.
+     */
+    private void mapOnTopology() {
+        cache.topology().readLock();
+
+        AffinityTopologyVersion topVer = null;
+
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
+
+                return;
+            }
+
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+            if (fut.isDone()) {
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
+
+                    return;
+                }
+
+                topVer = fut.topologyVersion();
+            }
+            else {
+                if (waitTopFut) {
+                    assert !topLocked : this;
+
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
+                }
+                else
+                    onDone(new GridCacheTryPutFailedException());
+
+                return;
+            }
+        }
+        finally {
+            cache.topology().readUnlock();
+        }
+
+        state.map(topVer);
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+     */
+    private boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private Collection<ClusterNode> mapKey(
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean fastMap
+    ) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+
+    /**
+     * Maps future to single node.
+     *
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+        if (cctx.localNodeId().equals(nodeId)) {
+            cache.updateAllAsyncInternal(nodeId, req,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
+                    }
+                });
+        }
+        else {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+                if (syncMode == FULL_ASYNC)
+                    onDone(new GridCacheReturn(cctx, true, null, true));
+            }
+            catch (IgniteCheckedException e) {
+                state.onSendError(req, e);
+            }
+        }
+    }
+
+    /**
+     * Sends messages to remote nodes and updates local cache.
+     *
+     * @param mappings Mappings to send.
+     */
+    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+        UUID locNodeId = cctx.localNodeId();
+
+        GridNearAtomicUpdateRequest locUpdate = null;
+
+        // Send messages to remote nodes first, then run local update.
+        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+            if (locNodeId.equals(req.nodeId())) {
+                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
+
+                locUpdate = req;
+            }
+            else {
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    state.onSendError(req, e);
+                }
+            }
+        }
+
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
+                    }
+                });
+        }
+
+        if (syncMode == FULL_ASYNC)
+            onDone(new GridCacheReturn(cctx, true, null, true));
+    }
+
+    /**
+     *
+     */
+    private class UpdateState {
+        /** Current topology version. */
+        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+        /** */
+        private GridCacheVersion updVer;
+
+        /** Topology version when got mapping error. */
+        private AffinityTopologyVersion mapErrTopVer;
+
+        /** Mappings if operations is mapped to more than one node. */
+        @GridToStringInclude
+        private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+        /** Error. */
+        private CachePartialUpdateCheckedException err;
+
+        /** Future ID. */
+        private GridCacheVersion futVer;
+
+        /** Completion future for a particular topology version. */
+        private GridFutureAdapter<Void> topCompleteFut;
+
+        /** Key to remap. */
+        private KeyCacheObject remapKey;
+
+        /** Not null is operation is mapped to single node. */
+        private GridNearAtomicUpdateRequest singleReq;
+
+        /** Operation result. */
+        private GridCacheReturn opRes;
+
+        /**
+         * @return Future version.
+         */
+        @Nullable synchronized GridCacheVersion futureVersion() {
+            return futVer;
+        }
+
+        /**
+         * @param nodeId Left node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            GridNearAtomicUpdateResponse res = null;
+
+            synchronized (this) {
+                GridNearAtomicUpdateRequest req;
+
+                if (singleReq != null)
+                    req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+                else
+                    req = mappings != null ? mappings.get(nodeId) : null;
+
+                if (req != null) {
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(),
+                        cctx.deploymentEnabled());
+
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+                        "before response is received: " + nodeId);
+
+                    e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+                    res.addFailedKeys(req.keys(), e);
+                }
+            }
+
+            if (res != null)
+                onResult(nodeId, res, true);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @param nodeErr {@code True} if response was created on node failure.
+         */
+        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+            GridNearAtomicUpdateRequest req;
+
+            AffinityTopologyVersion remapTopVer = null;
+
+            GridCacheReturn opRes0 = null;
+            CachePartialUpdateCheckedException err0 = null;
+
+            boolean rcvAll;
+
+            GridFutureAdapter<?> fut0 = null;
+
+            synchronized (this) {
+                if (!res.futureVersion().equals(futVer))
+                    return;
+
+                if (singleReq != null) {
+                    if (!singleReq.nodeId().equals(nodeId))
+                        return;
+
+                    req = singleReq;
+
+                    singleReq = null;
+
+                    rcvAll = true;
+                }
+                else {
+                    req = mappings != null ? mappings.remove(nodeId) : null;
+
+                    if (req != null)
+                        rcvAll = mappings.isEmpty();
+                    else
+                        return;
+                }
+
+                assert req != null && req.topologyVersion().equals(topVer) : req;
+
+                if (res.remapKeys() != null) {
+                    assert !fastMap || cctx.kernalContext().clientNode();
+
+                    remapKey = res.remapKeys().iterator().next();
+
+                    if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                        mapErrTopVer = req.topologyVersion();
+                }
+                else if (res.error() != null) {
+                    if (res.failedKeys() != null)
+                        addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                }
+                else {
+                    if (!req.fastMap() || req.hasPrimary()) {
+                        GridCacheReturn ret = res.returnValue();
+
+                        if (op == TRANSFORM) {
+                            if (ret != null)
+                                addInvokeResults(ret);
+                        }
+                        else
+                            opRes = ret;
+                    }
+                }
+
+                if (rcvAll) {
+                    if (remapKey != null) {
+                        assert mapErrTopVer != null;
+
+                        remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+                    }
+                    else {
+                        if (err != null &&
+                            X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                            X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                            storeFuture() &&
+                            --remapCnt > 0) {
+                            ClusterTopologyCheckedException topErr =
+                                X.cause(err, ClusterTopologyCheckedException.class);
+
+                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                                CachePartialUpdateCheckedException cause =
+                                    X.cause(err, CachePartialUpdateCheckedException.class);
+
+                                assert cause != null && cause.topologyVersion() != null : err;
+
+                                remapTopVer =
+                                    new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                                err = null;
+
+                                Object failedKey = cause.failedKeys().iterator().next();
+
+                                remapKey = cctx.toCacheKeyObject(failedKey);
+
+                                updVer = null;
+                            }
+                        }
+                    }
+
+                    if (remapTopVer == null) {
+                        err0 = err;
+                        opRes0 = opRes;
+                    }
+                    else {
+                        fut0 = topCompleteFut;
+
+                        topCompleteFut = null;
+
+                        cctx.mvcc().removeAtomicFuture(futVer);
+
+                        futVer = null;
+                        topVer = AffinityTopologyVersion.ZERO;
+                    }
+                }
+            }
+
+            if (res.error() != null && res.failedKeys() == null) {
+                onDone(res.error());
+
+                return;
+            }
+
+            if (!nodeErr && res.remapKeys() == null)
+                updateNear(req, res);
+
+            if (remapTopVer != null) {
+                if (fut0 != null)
+                    fut0.onDone();
+
+                if (!waitTopFut) {
+                    onDone(new GridCacheTryPutFailedException());
+
+                    return;
+                }
+
+                if (topLocked) {
+                    assert remapKey != null;
+
+                    CachePartialUpdateCheckedException e =
+                        new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+                    ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                        "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+                    cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+                    e.add(Collections.singleton(remapKey), cause);
+
+                    onDone(e);
+
+                    return;
+                }
+
+                IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    AffinityTopologyVersion topVer = fut.get();
+
+                                    map(topVer);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
+                            }
+                        });
+                    }
+                });
+
+                return;
+            }
+
+            if (rcvAll)
+                onDone(opRes0, err0);
+        }
+
+        /**
+         * @param req Request.
+         * @param e Error.
+         */
+        void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+            synchronized (this) {
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    req.nodeId(),
+                    req.futureVersion(),
+                    cctx.deploymentEnabled());
+
+                res.addFailedKeys(req.keys(), e);
+
+                onResult(req.nodeId(), res, true);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         */
+        void map(AffinityTopologyVersion topVer) {
+            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+            if (F.isEmpty(topNodes)) {
+                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid)."));
+
+                return;
+            }
+
+            Exception err = null;
+            GridNearAtomicUpdateRequest singleReq0 = null;
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+
+            synchronized (this) {
+                assert futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+
+                futVer = cctx.versions().next(topVer);
+
+                if (storeFuture()) {
+                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
+                        assert isDone() : GridNearAtomicSingleUpdateFuture.this;
+
+                        return;
+                    }
+                }
+
+                // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+                if (updVer == null)
+                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+
+                if (updVer != null && log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+
+                try {
+                    if (fastMap) {
+                        pendingMappings = mapUpdate(topNodes);
+
+                        if (pendingMappings.size() == 1)
+                            singleReq0 = singleReq = F.firstValue(pendingMappings);
+                        else {
+                            if (syncMode == PRIMARY_SYNC) {
+                                mappings = U.newHashMap(pendingMappings.size());
+
+                                for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                                    if (req.hasPrimary())
+                                        mappings.put(req.nodeId(), req);
+                                }
+                            }
+                            else
+                                mappings = new HashMap<>(pendingMappings);
+
+                            assert !mappings.isEmpty() : GridNearAtomicSingleUpdateFuture.this;
+                        }
+                    }
+                    else
+                        singleReq0 = singleReq = mapSingleUpdate();
+
+                    remapKey = null;
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+
+            if (err != null) {
+                onDone(err);
+
+                return;
+            }
+
+            // Optimize mapping for single key.
+            if (singleReq0 != null)
+                mapSingle(singleReq0.nodeId(), singleReq0);
+            else {
+                assert pendingMappings != null;
+
+                doUpdate(pendingMappings);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @return Future.
+         */
+        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
+
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
+
+                return topCompleteFut;
+            }
+
+            return null;
+        }
+
+        /**
+         * @return Future version.
+         */
+        GridCacheVersion onFutureDone() {
+            GridCacheVersion ver0;
+
+            GridFutureAdapter<Void> fut0;
+
+            synchronized (this) {
+                fut0 = topCompleteFut;
+
+                topCompleteFut = null;
+
+                ver0 = futVer;
+
+                futVer = null;
+            }
+
+            if (fut0 != null)
+                fut0.onDone();
+
+            return ver0;
+        }
+
+        /**
+         * @param topNodes Cache nodes.
+         * @return Mapping.
+         * @throws Exception If failed.
+         */
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+            assert fastMap;
+
+            Object key = GridNearAtomicSingleUpdateFuture.this.key;
+            Object val = GridNearAtomicSingleUpdateFuture.this.val;
+
+            if (val == null && op != GridCacheOperation.DELETE)
+                return Collections.emptyMap();
+
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+            if (op != TRANSFORM)
+                val = cctx.toCacheObject(val);
+
+            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+
+            if (affNodes.isEmpty())
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                    "(all partition nodes left the grid).");
+
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(affNodes.size());
+
+            int i = 0;
+
+            for (ClusterNode affNode : affNodes) {
+                if (affNode == null)
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
+
+                UUID nodeId = affNode.id();
+
+                GridNearAtomicUpdateRequest mapped = new GridNearAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    fastMap,
+                    updVer,
+                    topVer,
+                    topLocked,
+                    syncMode,
+                    op,
+                    retval,
+                    expiryPlc,
+                    invokeArgs,
+                    filter,
+                    subjId,
+                    taskNameHash,
+                    skipStore,
+                    cctx.kernalContext().clientNode(),
+                    cctx.deploymentEnabled(),
+                    true);
+
+                pendingMappings.put(nodeId, mapped);
+
+                mapped.addSingleUpdate(cacheKey, val, i == 0);
+
+                i++;
+            }
+
+            return pendingMappings;
+        }
+
+        /**
+         * @return Request.
+         * @throws Exception If failed.
+         */
+        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+            Object key = GridNearAtomicSingleUpdateFuture.this.key;
+            Object val = GridNearAtomicSingleUpdateFuture.this.val;
+
+            // We still can get here if user pass map with single element.
+            if (key == null)
+                throw new NullPointerException("Null key.");
+
+            if (val == null && op != GridCacheOperation.DELETE)
+                throw new NullPointerException("Null value.");
+
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+            if (op != TRANSFORM)
+                val = cctx.toCacheObject(val);
+
+            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+            if (primary == null)
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid).");
+
+            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                fastMap,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                true);
+
+            req.addSingleUpdate(cacheKey,
+                val,
+                true);
+
+            return req;
+        }
+
+        /**
+         * @param ret Result from single node.
+         */
+        @SuppressWarnings("unchecked")
+        private void addInvokeResults(GridCacheReturn ret) {
+            assert op == TRANSFORM : op;
+            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+            if (ret.value() != null) {
+                if (opRes != null)
+                    opRes.mergeEntryProcessResults(ret);
+                else
+                    opRes = ret;
+            }
+        }
+
+        /**
+         * @param failedKeys Failed keys.
+         * @param topVer Topology version for failed update.
+         * @param err Error cause.
+         */
+        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+            AffinityTopologyVersion topVer,
+            Throwable err) {
+            CachePartialUpdateCheckedException err0 = this.err;
+
+            if (err0 == null)
+                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+            Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+            for (KeyCacheObject key : failedKeys)
+                keys.add(key.value(cctx.cacheObjectContext(), false));
+
+            err0.add(keys, err, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized  String toString() {
+            return S.toString(UpdateState.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ae662c8..04557a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -526,6 +526,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             onDone(new GridCacheReturn(cctx, true, null, true));
     }
 
+    @Override
+    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void onResult(UUID nodeId) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
     /**
      *
      */
@@ -1051,7 +1065,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             taskNameHash,
                             skipStore,
                             cctx.kernalContext().clientNode(),
-                            cctx.deploymentEnabled());
+                            cctx.deploymentEnabled(),
+                            false);
 
                         pendingMappings.put(nodeId, mapped);
                     }
@@ -1144,7 +1159,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 taskNameHash,
                 skipStore,
                 cctx.kernalContext().clientNode(),
-                cctx.deploymentEnabled());
+                cctx.deploymentEnabled(),
+                false);
 
             req.addUpdateEntry(cacheKey,
                 val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 33fa4bd..784911d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -89,6 +89,16 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** Update operation. */
     private GridCacheOperation op;
 
+    /** */
+    private KeyCacheObject singleKey;
+
+    /** */
+    private CacheObject singleVal;
+
+    /** */
+    @GridDirectTransient
+    private EntryProcessor singleEntryProcessor;
+
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -106,6 +116,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     @GridDirectCollection(byte[].class)
     private List<byte[]> entryProcessorsBytes;
 
+    private byte[] singleEntryProcessorsBytes;
+
     /** Optional arguments for entry processor. */
     @GridDirectTransient
     private Object[] invokeArgs;
@@ -198,7 +210,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         int taskNameHash,
         boolean skipStore,
         boolean clientReq,
-        boolean addDepInfo
+        boolean addDepInfo,
+        boolean single
     ) {
         assert futVer != null;
 
@@ -222,7 +235,12 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.clientReq = clientReq;
         this.addDepInfo = addDepInfo;
 
-        keys = new ArrayList<>();
+        if (!single)
+            keys = new ArrayList<>();
+    }
+
+    public boolean singleUpdate() {
+        return singleKey != null;
     }
 
     /** {@inheritDoc} */
@@ -335,6 +353,22 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return skipStore;
     }
 
+    public void addSingleUpdate(KeyCacheObject key, @Nullable Object val, boolean primary) {
+        assert val != null || op == DELETE;
+
+        if (op == TRANSFORM) {
+            assert val instanceof EntryProcessor : val;
+
+            singleEntryProcessor = (EntryProcessor<Object, Object, Object>)val;
+        }
+        else
+            singleVal = (CacheObject)val;
+
+        singleKey = key;
+
+        hasPrimary = primary;
+    }
+
     /**
      * @param key Key to add.
      * @param val Optional update value.
@@ -415,6 +449,20 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         }
     }
 
+    public KeyCacheObject singleKey() {
+        assert singleKey != null;
+
+        return singleKey;
+    }
+
+    public CacheObject singleWriteValue() {
+        return singleVal;
+    }
+
+    public EntryProcessor singleEntryProcessor() {
+        return singleEntryProcessor;
+    }
+
     /**
      * @return Keys for this update request.
      */
@@ -539,8 +587,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(keys, cctx);
-
         if (filter != null) {
             boolean hasFilter = false;
 
@@ -559,17 +605,40 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         if (expiryPlc != null)
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
-        if (op == TRANSFORM) {
-            // force addition of deployment info for entry processors if P2P is enabled globally.
-            if (!addDepInfo && ctx.deploymentEnabled())
-                addDepInfo = true;
+        if (singleKey != null) {
+            prepareMarshalCacheObject(singleKey, cctx);
 
-            entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+            if (op == TRANSFORM) {
+                // TODO
+                assert false;
 
-            invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+                // force addition of deployment info for entry processors if P2P is enabled globally.
+                if (!addDepInfo && ctx.deploymentEnabled())
+                    addDepInfo = true;
+
+                // TODO
+                // entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+            }
+            else
+                prepareMarshalCacheObject(singleVal, cctx);
+        }
+        else {
+            prepareMarshalCacheObjects(keys, cctx);
+
+            if (op == TRANSFORM) {
+                // force addition of deployment info for entry processors if P2P is enabled globally.
+                if (!addDepInfo && ctx.deploymentEnabled())
+                    addDepInfo = true;
+
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+            }
+            else
+                prepareMarshalCacheObjects(vals, cctx);
         }
-        else
-            prepareMarshalCacheObjects(vals, cctx);
     }
 
     /** {@inheritDoc} */
@@ -578,12 +647,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(keys, cctx, ldr);
+        if (singleKey != null) {
+            finishUnmarshalCacheObject(singleKey, cctx, ldr);
 
-        if (op == TRANSFORM)
-            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-        else
-            finishUnmarshalCacheObjects(vals, cctx, ldr);
+            if (op == TRANSFORM) {
+                // TODO
+                assert false;
+            }
+            else
+                finishUnmarshalCacheObject(singleVal, cctx, ldr);
+        }
+        else {
+            finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+            if (op == TRANSFORM)
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+            else
+                finishUnmarshalCacheObjects(vals, cctx, ldr);
+        }
 
         if (filter != null) {
             for (CacheEntryPredicate p : filter) {
@@ -703,48 +784,66 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeByteArray("singleEntryProcessorsBytes", singleEntryProcessorsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("singleKey", singleKey))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeMessage("singleVal", singleVal))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 24:
+                if (!writer.writeBoolean("topLocked", topLocked))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 26:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 27:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -883,7 +982,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 17:
-                skipStore = reader.readBoolean("skipStore");
+                singleEntryProcessorsBytes = reader.readByteArray("singleEntryProcessorsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -891,7 +990,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 18:
-                subjId = reader.readUuid("subjId");
+                singleKey = reader.readMessage("singleKey");
 
                 if (!reader.isLastRead())
                     return false;
@@ -899,6 +998,30 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 19:
+                singleVal = reader.readMessage("singleVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 21:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -910,7 +1033,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 23:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -918,7 +1041,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 24:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -926,7 +1049,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 25:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -934,7 +1057,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 26:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -942,7 +1065,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 24:
+            case 27:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -962,7 +1085,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 25;
+        return 28;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1bf03a9..f6daa3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -150,14 +150,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        for (int i = 0; i < req.keys().size(); i++) {
-            if (F.contains(skipped, i))
-                continue;
+        if (req.singleUpdate()) {
+            if (F.contains(skipped, 0))
+                return;
 
-            KeyCacheObject key = req.keys().get(i);
+            KeyCacheObject key = req.singleKey();
 
             if (F.contains(failed, key))
-                continue;
+                return;
 
             if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
                 GridCacheEntryEx entry = peekEx(key);
@@ -165,25 +165,22 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 if (entry != null && entry.markObsolete(ver))
                     removeEntry(entry);
 
-                continue;
+                return;
             }
 
             CacheObject val = null;
 
-            if (F.contains(nearValsIdxs, i)) {
+            if (F.contains(nearValsIdxs, 0))
                 val = res.nearValue(nearValIdx);
-
-                nearValIdx++;
-            }
             else {
                 assert req.operation() != TRANSFORM;
 
                 if (req.operation() != DELETE)
-                    val = req.value(i);
+                    val = req.singleWriteValue();
             }
 
-            long ttl = res.nearTtl(i);
-            long expireTime = res.nearExpireTime(i);
+            long ttl = res.nearTtl(0);
+            long expireTime = res.nearExpireTime(0);
 
             if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
                 expireTime = CU.toExpireTime(ttl);
@@ -203,6 +200,61 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
             }
         }
+        else {
+            for (int i = 0; i < req.keys().size(); i++) {
+                if (F.contains(skipped, i))
+                    continue;
+
+                KeyCacheObject key = req.keys().get(i);
+
+                if (F.contains(failed, key))
+                    continue;
+
+                if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
+                    GridCacheEntryEx entry = peekEx(key);
+
+                    if (entry != null && entry.markObsolete(ver))
+                        removeEntry(entry);
+
+                    continue;
+                }
+
+                CacheObject val = null;
+
+                if (F.contains(nearValsIdxs, i)) {
+                    val = res.nearValue(nearValIdx);
+
+                    nearValIdx++;
+                }
+                else {
+                    assert req.operation() != TRANSFORM;
+
+                    if (req.operation() != DELETE)
+                        val = req.value(i);
+                }
+
+                long ttl = res.nearTtl(i);
+                long expireTime = res.nearExpireTime(i);
+
+                if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
+                    expireTime = CU.toExpireTime(ttl);
+
+                try {
+                    processNearAtomicUpdateResponse(ver,
+                        key,
+                        val,
+                        null,
+                        ttl,
+                        expireTime,
+                        req.nodeId(),
+                        req.subjectId(),
+                        taskName);
+                }
+                catch (IgniteCheckedException e) {
+                    res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
+                }
+            }
+        }
     }
 
     /**


Mime
View raw message