ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/8] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Mon, 13 Mar 2017 15:09:38 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 0a816a7..930c4af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,9 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -41,36 +40,29 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture
{
-    /** */
-    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
     /** Keys */
     private Object key;
 
     /** Values. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Object val;
 
-    /** Not null is operation is mapped to single node. */
-    private GridNearAtomicAbstractUpdateRequest req;
+    /** */
+    private PrimaryRequestState reqState;
 
     /**
      * @param cctx Cache context.
@@ -110,8 +102,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         int remapCnt,
         boolean waitTopFut
     ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter,
subjId, taskNameHash,
-            skipStore, keepBinary, remapCnt, waitTopFut);
+        super(cctx,
+            cache,
+            syncMode,
+            op,
+            invokeArgs,
+            retval,
+            rawRetval,
+            expiryPlc,
+            filter,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            remapCnt,
+            waitTopFut);
 
         assert subjId != null;
 
@@ -120,52 +125,63 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
+    @Override public Long id() {
         synchronized (mux) {
-            return futVer;
+            return futId;
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicUpdateResponse res = null;
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
 
-        GridNearAtomicAbstractUpdateRequest req;
+        GridNearAtomicCheckUpdateRequest checkReq = null;
+
+        boolean rcvAll = false;
 
         synchronized (mux) {
-            req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req
: null;
+            if (reqState == null)
+                return false;
 
-            if (req != null && req.response() == null) {
-                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                    nodeId,
-                    req.futureVersion(),
-                    cctx.deploymentEnabled());
+            if (reqState.req.nodeId.equals(nodeId)) {
+                GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
+
+                if (req != null) {
+                    GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
-                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary
node left grid " +
-                    "before response is received: " + nodeId);
+                    rcvAll = true;
 
-                e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+                    reqState.onPrimaryResponse(res, cctx);
 
-                res.addFailedKeys(req.keys(), e);
+                    onPrimaryError(req, res);
+                }
             }
-        }
+            else {
+                DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
 
-        if (res != null) {
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion()
+
-                    ", writeVer=" + req.updateVersion() +
-                    ", node=" + nodeId + ']');
+                if (res == DhtLeftResult.DONE)
+                    rcvAll = true;
+                else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+                    checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+                else
+                    return false;
             }
 
-            onResult(nodeId, res, true);
+            if (rcvAll) {
+                opRes0 = opRes;
+                err0 = err;
+                remapTopVer0 = onAllReceived();
+            }
         }
 
-        return false;
-    }
+        if (checkReq != null)
+            sendCheckUpdateRequest(checkReq);
+        else if (rcvAll)
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion
topVer) {
-        return null;
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -175,15 +191,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         GridCacheReturn ret = (GridCacheReturn)res;
 
-        Object retval =
-            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
-                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+        Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM)
?
+            cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
 
         if (op == TRANSFORM && retval == null)
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = onFutureDone();
+            Long futVer = onFutureDone();
 
             if (futVer != null)
                 cctx.mvcc().removeAtomicFuture(futVer);
@@ -195,112 +210,103 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
+    @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        GridCacheReturn opRes0;
+        CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            assert reqState != null;
+            assert reqState.req.nodeId().equals(res.primaryId());
+
+            if (opRes == null && res.hasResult())
+                opRes = res.result();
+
+            if (reqState.onDhtResponse(nodeId, res)) {
+                opRes0 = opRes;
+                err0 = err;
+                remapTopVer0 = onAllReceived();
+            }
+            else
+                return;
+        }
+
+        UpdateErrors errors = res.errors();
+
+        if (errors != null) {
+            assert errors.error() != null;
+
+            onDone(errors.error());
+
+            return;
+        }
+
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean
nodeErr) {
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res,
boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
 
-        AffinityTopologyVersion remapTopVer = null;
+        AffinityTopologyVersion remapTopVer0;
 
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        GridFutureAdapter<?> fut0 = null;
-
         synchronized (mux) {
-            if (!res.futureVersion().equals(futVer))
+            if (futId == null || futId != res.futureId())
                 return;
 
-            if (!this.req.nodeId().equals(nodeId))
-                return;
+            req = reqState.processPrimaryResponse(nodeId, res);
 
-            req = this.req;
-
-            this.req = null;
+            if (req == null)
+                return;
 
-            boolean remapKey = !F.isEmpty(res.remapKeys());
+            boolean remapKey = res.remapTopologyVersion() != null;
 
             if (remapKey) {
-                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion())
< 0)
-                    mapErrTopVer = req.topologyVersion();
-            }
-            else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if possible).");
-
-                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+                assert !req.topologyVersion().equals(res.remapTopologyVersion());
 
-                    for (KeyCacheObject key : res.failedKeys())
-                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary,
false));
+                assert remapTopVer == null : remapTopVer;
 
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
+                remapTopVer = res.remapTopologyVersion();
             }
+            else if (res.error() != null)
+                onPrimaryError(req, res);
             else {
-                if (!req.fastMap() || req.hasPrimary()) {
-                    GridCacheReturn ret = res.returnValue();
-
-                    if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
+                GridCacheReturn ret = res.returnValue();
+
+                if (op == TRANSFORM) {
+                    if (ret != null) {
+                        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+                        if (ret.value() != null) {
+                            if (opRes != null)
+                                opRes.mergeEntryProcessResults(ret);
+                            else
+                                opRes = ret;
                         }
                     }
-                    else
-                        opRes = ret;
                 }
-            }
+                else
+                    opRes = ret;
 
-            if (remapKey) {
-                assert mapErrTopVer != null;
+                assert reqState != null;
 
-                remapTopVer = cctx.shared().exchange().topologyVersion();
+                if (!reqState.onPrimaryResponse(res, cctx))
+                    return;
             }
-            else {
-                if (err != null &&
-                    X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-                    X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                    storeFuture() &&
-                    --remapCnt > 0) {
-                    ClusterTopologyCheckedException topErr =
-                        X.cause(err, ClusterTopologyCheckedException.class);
-
-                    if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
-                        CachePartialUpdateCheckedException cause =
-                            X.cause(err, CachePartialUpdateCheckedException.class);
 
-                        assert cause != null && cause.topologyVersion() != null :
err;
+            remapTopVer0 = onAllReceived();
 
-                        remapTopVer =
-                            new AffinityTopologyVersion(cause.topologyVersion().topologyVersion()
+ 1);
-
-                        err = null;
-                        updVer = null;
-                    }
-                }
-            }
-
-            if (remapTopVer == null) {
+            if (remapTopVer0 == null) {
                 err0 = err;
                 opRes0 = opRes;
             }
-            else {
-                fut0 = topCompleteFut;
-
-                topCompleteFut = null;
-
-                cctx.mvcc().removeAtomicFuture(futVer);
-
-                futVer = null;
-                topVer = AffinityTopologyVersion.ZERO;
-            }
         }
 
         if (res.error() != null && res.failedKeys() == null) {
@@ -309,55 +315,102 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             return;
         }
 
+        if (remapTopVer0 != null) {
+            waitAndRemap(remapTopVer0);
+
+            return;
+        }
+
         if (nearEnabled && !nodeErr)
             updateNear(req, res);
 
-        if (remapTopVer != null) {
-            if (fut0 != null)
-                fut0.onDone();
+        onDone(opRes0, err0);
+    }
 
-            if (!waitTopFut) {
-                onDone(new GridCacheTryPutFailedException());
+    /**
+     * @return Non-null topology version if update should be remapped.
+     */
+    private AffinityTopologyVersion onAllReceived() {
+        assert futId != null;
 
-                return;
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        if (remapTopVer == null) {
+            if (err != null &&
+                X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                storeFuture() &&
+                --remapCnt > 0) {
+                ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
+
+                if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    CachePartialUpdateCheckedException cause =
+                        X.cause(err, CachePartialUpdateCheckedException.class);
+
+                    assert cause != null && cause.topologyVersion() != null : err;
+
+                    remapTopVer0 = new AffinityTopologyVersion(cause.topologyVersion().topologyVersion()
+ 1);
+
+                    err = null;
+                }
             }
+        }
+        else
+            remapTopVer0 = remapTopVer;
 
-            if (topLocked) {
-                CachePartialUpdateCheckedException e =
-                    new CachePartialUpdateCheckedException("Failed to update keys (retry
update if possible).");
+        if (remapTopVer0 != null) {
+            cctx.mvcc().removeAtomicFuture(futId);
 
-                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
-                    "Failed to update keys, topology changed while execute atomic update
inside transaction.");
+            reqState = null;
+            futId = null;
+            topVer = AffinityTopologyVersion.ZERO;
 
-                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+            remapTopVer = null;
+        }
 
-                e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+        return remapTopVer0;
+    }
 
-                onDone(e);
+    /**
+     * @param remapTopVer New topology version.
+     */
+    private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+        if (!waitTopFut) {
+            onDone(new GridCacheTryPutFailedException());
 
-                return;
-            }
+            return;
+        }
 
-            IgniteInternalFuture<AffinityTopologyVersion> fut =
-                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+        if (topLocked) {
+            CachePartialUpdateCheckedException e =
+                new CachePartialUpdateCheckedException("Failed to update keys (retry update
if possible).");
 
-            if (fut == null)
-                fut = new GridFinishedFuture<>(remapTopVer);
+            ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                "Failed to update keys, topology changed while execute atomic update inside
transaction.");
 
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion>
fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            mapOnTopology();
-                        }
-                    });
-                }
-            });
+            cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+            e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+
+            onDone(e);
 
             return;
         }
 
-        onDone(opRes0, err0);
+        IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+        if (fut == null)
+            fut = new GridFinishedFuture<>(remapTopVer);
+
+        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion>
fut) {
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        mapOnTopology();
+                    }
+                });
+            }
+        });
     }
 
     /**
@@ -369,7 +422,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse
res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapTopologyVersion() != null)
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -380,103 +433,74 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
         AffinityTopologyVersion topVer;
-        GridCacheVersion futVer;
-
-        cache.topology().readLock();
 
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache
is stopped): " +
-                    cache.name()));
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is
stopped): " +
+                cache.name()));
 
-                return;
-            }
-
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+            return;
+        }
 
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                if (err != null) {
-                    onDone(err);
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                    return;
-                }
-
-                topVer = fut.topologyVersion();
-
-                futVer = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
-        }
-        finally {
-            cache.topology().readUnlock();
-        }
-
-        if (futVer != null)
-            map(topVer, futVer);
-    }
 
-    /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer)
{
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
-        if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache
(all partition nodes " +
-                "left the grid)."));
+            topVer = fut.topologyVersion();
+        }
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
 
             return;
         }
 
-        GridCacheVersion updVer;
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-            updVer = this.updVer;
-
-            if (updVer == null) {
-                updVer = futVer;
+        map(topVer);
+    }
 
-                if (log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-            }
-        }
-        else
-            updVer = null;
+    /** {@inheritDoc} */
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
-        GridNearAtomicAbstractUpdateRequest singleReq0 = null;
+        PrimaryRequestState reqState0 = null;
 
         try {
-            singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+            reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futVer == futVer || (this.isDone() && this.error() !=
null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+                this.futId = futId;
 
-                this.updVer = updVer;
+                reqState = reqState0;
+            }
 
-                resCnt = 0;
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
 
-                req = singleReq0;
+                return;
             }
         }
         catch (Exception e) {
@@ -490,43 +514,80 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         // Optimize mapping for single key.
-        mapSingle(singleReq0.nodeId(), singleReq0);
+        sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
+
+        if (syncMode == FULL_ASYNC) {
+            onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+            return;
+        }
+
+        if (reqState0.req.initMappingLocally() && (cctx.discovery().topologyVersion()
!= topVer.topologyVersion()))
+            checkDhtNodes(futId);
     }
 
     /**
-     * @return Future version.
+     * @param futId
+     * @return
      */
-    private GridCacheVersion onFutureDone() {
-        GridCacheVersion ver0;
+    private boolean checkDhtNodes(Long futId) {
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
 
-        GridFutureAdapter<Void> fut0;
+        GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
-            fut0 = topCompleteFut;
+            if (this.futId == null || !this.futId.equals(futId))
+                return false;
 
-            topCompleteFut = null;
+            assert reqState != null;
 
-            ver0 = futVer;
+            DhtLeftResult res = reqState.checkDhtNodes(cctx);
 
-            futVer = null;
+            if (res == DhtLeftResult.DONE) {
+                opRes0 = opRes;
+                err0 = err;
+                remapTopVer0 = onAllReceived();
+            }
+            else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+                checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+            }
+            else
+                return true;
         }
 
-        if (fut0 != null)
-            fut0.onDone();
+        if (checkReq != null)
+            sendCheckUpdateRequest(checkReq);
+        else
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+        return false;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    private Long onFutureDone() {
+        Long id0;
+
+        synchronized (mux) {
+            id0 = futId;
 
-        return ver0;
+            futId = null;
+        }
+
+        return id0;
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
+     * @param futId Future ID.
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
+        throws Exception {
         if (key == null)
             throw new NullPointerException("Null key.");
 
@@ -542,22 +603,27 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         else
             val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-        ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer);
+        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
+            !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+
+        List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
-        if (primary == null)
+        if (F.isEmpty(nodes))
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache
(all partition nodes " +
                 "left the grid).");
 
+        ClusterNode primary = nodes.get(0);
+
+        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1;
+
         GridNearAtomicAbstractUpdateRequest req;
 
-        if (canUseSingleRequest(primary)) {
+        if (canUseSingleRequest()) {
             if (op == TRANSFORM) {
                 req = new GridNearAtomicSingleUpdateInvokeRequest(
                     cctx.cacheId(),
                     primary.id(),
-                    futVer,
-                    false,
-                    updVer,
+                    futId,
                     topVer,
                     topLocked,
                     syncMode,
@@ -566,9 +632,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     invokeArgs,
                     subjId,
                     taskNameHash,
+                    needPrimaryRes,
                     skipStore,
                     keepBinary,
-                    cctx.kernalContext().clientNode(),
                     cctx.deploymentEnabled());
             }
             else {
@@ -576,9 +642,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     req = new GridNearAtomicSingleUpdateRequest(
                         cctx.cacheId(),
                         primary.id(),
-                        futVer,
-                        false,
-                        updVer,
+                        futId,
                         topVer,
                         topLocked,
                         syncMode,
@@ -586,18 +650,16 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         retval,
                         subjId,
                         taskNameHash,
+                        needPrimaryRes,
                         skipStore,
                         keepBinary,
-                        cctx.kernalContext().clientNode(),
                         cctx.deploymentEnabled());
                 }
                 else {
                     req = new GridNearAtomicSingleUpdateFilterRequest(
                         cctx.cacheId(),
                         primary.id(),
-                        futVer,
-                        false,
-                        updVer,
+                        futId,
                         topVer,
                         topLocked,
                         syncMode,
@@ -606,9 +668,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         filter,
                         subjId,
                         taskNameHash,
+                        needPrimaryRes,
                         skipStore,
                         keepBinary,
-                        cctx.kernalContext().clientNode(),
                         cctx.deploymentEnabled());
                 }
             }
@@ -617,9 +679,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             req = new GridNearAtomicFullUpdateRequest(
                 cctx.cacheId(),
                 primary.id(),
-                futVer,
-                false,
-                updVer,
+                futId,
                 topVer,
                 topLocked,
                 syncMode,
@@ -630,9 +690,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 filter,
                 subjId,
                 taskNameHash,
+                needPrimaryRes,
                 skipStore,
                 keepBinary,
-                cctx.kernalContext().clientNode(),
                 cctx.deploymentEnabled(),
                 1);
         }
@@ -641,18 +701,39 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             val,
             CU.TTL_NOT_CHANGED,
             CU.EXPIRE_TIME_CALCULATE,
-            null,
-            true);
+            null);
+
+        return new PrimaryRequestState(req, nodes, true);
+    }
+
+    /**
+     * @param opRes Operation result.
+     * @param err Operation error.
+     * @param remapTopVer Not-null topology version if need remap update.
+     */
+    private void finishUpdateFuture(GridCacheReturn opRes,
+        CachePartialUpdateCheckedException err,
+        @Nullable AffinityTopologyVersion remapTopVer) {
+        if (remapTopVer != null) {
+            waitAndRemap(remapTopVer);
+
+            return;
+        }
+
+        if (nearEnabled) {
+            assert reqState.req.response() != null;
+
+            updateNear(reqState.req, reqState.req.response());
+        }
 
-        return req;
+        onDone(opRes, err);
     }
 
     /**
-     * @param node Target node
      * @return {@code True} can use 'single' update requests.
      */
-    private boolean canUseSingleRequest(ClusterNode node) {
-        return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST)
>= 0;
+    private boolean canUseSingleRequest() {
+        return expiryPlc == null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 6582063..f8b3984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -76,9 +76,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
+     * @param futId Future ID.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -89,15 +87,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateInvokeRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
+        long futId,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -106,17 +101,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         @Nullable Object[] invokeArgs,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
         super(
             cacheId,
             nodeId,
-            futVer,
-            fastMap,
-            updateVer,
+            futId,
             topVer,
             topLocked,
             syncMode,
@@ -124,14 +117,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            needPrimaryRes,
             skipStore,
             keepBinary,
-            clientReq,
             addDepInfo
         );
-        this.invokeArgs = invokeArgs;
 
         assert op == TRANSFORM : op;
+
+        this.invokeArgs = invokeArgs;
     }
 
     /**
@@ -140,14 +134,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         assert conflictTtl < 0 : conflictTtl;
         assert conflictExpireTime < 0 : conflictExpireTime;
         assert conflictVer == null : conflictVer;
@@ -156,9 +148,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         entryProcessor = (EntryProcessor<Object, Object, Object>)val;
 
         this.key = key;
-        partId = key.partition();
-
-        hasPrimary(hasPrimary() | primary);
     }
 
     /** {@inheritDoc} */
@@ -246,13 +235,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 14:
+            case 12:
                 if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 13:
                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
@@ -274,7 +263,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 14:
+            case 12:
                 entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
 
                 if (!reader.isLastRead())
@@ -282,7 +271,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
                 reader.incrementState();
 
-            case 15:
+            case 13:
                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR,
byte[].class);
 
                 if (!reader.isLastRead())
@@ -297,7 +286,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index c3e9fbe..b9a1fc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -57,9 +57,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
     /** Value to update. */
     protected CacheObject val;
 
-    /** Partition of key. */
-    protected int partId;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -72,9 +69,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
+     * @param futId Future ID.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -84,15 +79,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
+        long futId,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -100,17 +92,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
-        super(
-            cacheId,
+        super(cacheId,
             nodeId,
-            futVer,
-            fastMap,
-            updateVer,
+            futId,
             topVer,
             topLocked,
             syncMode,
@@ -118,16 +107,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             retval,
             subjId,
             taskNameHash,
+            needPrimaryRes,
             skipStore,
             keepBinary,
-            clientReq,
-            addDepInfo
-        );
+            addDepInfo);
     }
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return partId;
+        assert key != null;
+
+        return key.partition();
     }
 
     /**
@@ -136,14 +126,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         assert op != TRANSFORM;
         assert val != null || op == DELETE;
         assert conflictTtl < 0 : conflictTtl;
@@ -151,19 +139,18 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         assert conflictVer == null : conflictVer;
 
         this.key = key;
-        partId = key.partition();
 
         if (val != null) {
             assert val instanceof CacheObject : val;
 
             this.val = (CacheObject)val;
         }
-
-        hasPrimary(hasPrimary() | primary);
     }
 
     /** {@inheritDoc} */
     @Override public int size() {
+        assert key != null;
+
         return key == null ? 0 : 1;
     }
 
@@ -253,8 +240,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
         if (val != null)
             val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
-
-        key.partition(partId);
     }
 
     /** {@inheritDoc} */
@@ -272,19 +257,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         }
 
         switch (writer.state()) {
-            case 11:
+            case 10:
                 if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
-                if (!writer.writeInt("partId", partId))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
+            case 11:
                 if (!writer.writeMessage("val", val))
                     return false;
 
@@ -306,7 +285,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             return false;
 
         switch (reader.state()) {
-            case 11:
+            case 10:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -314,15 +293,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
                 reader.incrementState();
 
-            case 12:
-                partId = reader.readInt("partId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
+            case 11:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -350,7 +321,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 12;
     }
 
     /** {@inheritDoc} */


Mime
View raw message