ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] ignite git commit: ignite-4705
Date Wed, 22 Feb 2017 11:46:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 1b5dcceab -> f81e774b9


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11e8776a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11e8776a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11e8776a

Branch: refs/heads/ignite-4705
Commit: 11e8776a770d8ce5be0208fe82e65362760ed4cf
Parents: ba08585
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 21 18:05:52 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 21 18:05:52 2017 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 77 ++++++++++++++++----
 .../GridDhtAtomicAbstractUpdateRequest.java     |  3 +
 .../dht/atomic/GridDhtAtomicCache.java          | 21 ++----
 .../dht/atomic/GridDhtAtomicNearResponse.java   | 68 ++++++++++++++---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  3 -
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  4 -
 .../GridNearAtomicAbstractUpdateFuture.java     |  7 ++
 7 files changed, 134 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 96bfcb4..79fb7fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -40,17 +40,14 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
@@ -80,12 +77,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Future version. */
     protected final long futId;
 
-    /** Completion callback. */
-    @GridToStringExclude
-    private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
-
     /** Update request. */
-    protected final GridNearAtomicAbstractUpdateRequest updateReq;
+    final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
     final GridNearAtomicUpdateResponse updateRes;
@@ -100,16 +93,17 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Response count. */
     private volatile int resCnt;
 
+    /** */
+    private boolean repliedToNear;
+
     /**
      * @param cctx Cache context.
-     * @param completionCb Callback to invoke when future is completed.
      * @param writeVer Write version.
      * @param updateReq Update request.
      * @param updateRes Update response.
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
@@ -117,7 +111,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         this.cctx = cctx;
 
         this.updateReq = updateReq;
-        this.completionCb = completionCb;
         this.updateRes = updateRes;
         this.writeVer = writeVer;
 
@@ -235,6 +228,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers);
 
     /**
+     * @param nearNodeId Near node ID.
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
@@ -336,9 +330,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId)
: null;
 
+        boolean needReplyToNear = false;
+
         if (req != null) {
             synchronized (this) {
                 if (req.onResponse()) {
+                    if (nodeErr && !repliedToNear)
+                        needReplyToNear = repliedToNear = true;
+
                     resCnt0 = resCnt;
 
                     resCnt0 += 1;
@@ -349,6 +348,51 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     return false;
             }
 
+            if (needReplyToNear) {
+                assert !F.isEmpty(mappings);
+
+                List<UUID> dhtNodes = new ArrayList<>(mappings.size());
+
+                dhtNodes.addAll(mappings.keySet());
+
+                GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
+                    req.partition(),
+                    req.futureId(),
+                    cctx.localNodeId(),
+                    dhtNodes,
+                    req.flags());
+
+                res.failedNodeId(nodeId);
+
+                try {
+                    cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
+
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DTH update fut, sent response on DHT node fail " +
+                            "[futId=" + futId +
+                            ", writeVer=" + writeVer +
+                            ", node=" + req.nearNodeId() +
+                            ", failedNode=" + nodeId + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DTH update fut, failed to notify near node on DHT node
fail, near node left " +
+                            "[futId=" + futId +
+                            ", writeVer=" + writeVer +
+                            ", node=" + req.nearNodeId() +
+                            ", failedNode=" + nodeId + ']');
+                    }
+                }
+                catch (IgniteCheckedException ignored) {
+                    U.error(msgLog, "DTH update fut, failed to notify near node on DHT node
fail " +
+                        "[futId=" + futId +
+                        ", writeVer=" + writeVer +
+                        ", node=" + req.nearNodeId() +
+                        ", failedNode=" + nodeId + ']');
+                }
+            }
+
             if (resCnt0 == mappings.size())
                 onDone();
 
@@ -361,11 +405,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /**
      * Sends requests to remote nodes.
      *
+     * @param completionCb Callback to invoke to send response to near node.
      * @param ret Cache operation return value.
      */
-    final void map(GridCacheReturn ret) {
+    final void map(GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheReturn ret)
{
         boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
-        boolean primaryReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC
|| ret.hasValue();
+        repliedToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || ret.hasValue();
 
         List<UUID> dhtNodes = null;
 
@@ -378,14 +423,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             else
                 dhtNodes = Collections.emptyList();
 
-            if (primaryReplyToNear)
+            if (repliedToNear)
                 updateRes.mapping(dhtNodes);
         }
 
         if (!F.isEmpty(mappings)) {
-            sendDhtRequests(fullSync && !primaryReplyToNear, dhtNodes, ret);
+            sendDhtRequests(fullSync && !repliedToNear, dhtNodes, ret);
 
-            if (primaryReplyToNear)
+            if (repliedToNear)
                 completionCb.apply(updateReq, updateRes);
             else {
                 if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR)
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index c991e66..1841a49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -56,6 +56,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** */
     public static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10;
 
+    /** */
+    public static final int DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE = 0x20;
+
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 30d58cb..0557bc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1825,7 +1825,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, false);
+                        dhtFut = createDhtFuture(ver, req, res, false);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1865,7 +1865,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 locked,
                                 ver,
                                 dhtFut,
-                                completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
@@ -1947,7 +1946,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         else {
             // If there are backups, map backup update future.
             if (dhtFut != null) {
-                dhtFut.map(res.returnValue());
+                dhtFut.map(completionCb, res.returnValue());
                 // Otherwise, complete the call.
             }
             else
@@ -2135,7 +2134,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 null,
                                 entryProcessorMap,
                                 dhtFut,
-                                completionCb,
                                 req,
                                 res,
                                 replicate,
@@ -2184,7 +2182,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 rmvKeys,
                                 entryProcessorMap,
                                 dhtFut,
-                                completionCb,
                                 req,
                                 res,
                                 replicate,
@@ -2311,7 +2308,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 rmvKeys,
                 entryProcessorMap,
                 dhtFut,
-                completionCb,
                 req,
                 res,
                 replicate,
@@ -2391,7 +2387,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param locked Locked entries.
      * @param ver Assigned update version.
      * @param dhtFut Optional DHT future.
-     * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
      * @param expiry Expiry policy.
@@ -2407,7 +2402,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2485,7 +2479,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+                    dhtFut = createDhtFuture(ver, req, res, true);
 
                     readersOnly = true;
                 }
@@ -2638,7 +2632,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>>
entryProcessorMap,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        final GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
@@ -2792,7 +2785,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     batchRes.addDeleted(entry, updRes, entries);
 
                     if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+                        dhtFut = createDhtFuture(ver, req, res, true);
 
                         batchRes.readersOnly(true);
                     }
@@ -3112,7 +3105,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param writeVer Write version.
      * @param updateReq Update request.
      * @param updateRes Update response.
-     * @param completionCb Completion callback to invoke when future is completed.
      * @param force If {@code true} then creates future without optimizations checks.
      * @return Backup update future or {@code null} if there are no backups.
      */
@@ -3120,13 +3112,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         boolean force
     ) {
         if (updateReq.size() == 1)
-            return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq,
updateRes);
+            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq,
updateRes);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes);
 //        if (!force) {
 //            if (updateReq.fastMap())
 //                return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 244c22c..af9e908 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -32,6 +32,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
 
 /**
@@ -63,6 +64,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     /** */
     private UpdateErrors errors;
 
+    /** */
+    private UUID failedNodeId;
+
     /**
      *
      */
@@ -78,7 +82,13 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
      * @param mapping Update mapping.
      * @param flags Flags.
      */
-    public GridDhtAtomicNearResponse(int cacheId, int partId, long futId, UUID primaryId,
List<UUID> mapping, byte flags) {
+    public GridDhtAtomicNearResponse(int cacheId,
+        int partId,
+        long futId,
+        UUID primaryId,
+        List<UUID> mapping,
+        byte flags)
+    {
         this.cacheId = cacheId;
         this.partId = partId;
         this.futId = futId;
@@ -88,9 +98,31 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     }
 
     /**
+     * @return Failed node ID.
+     */
+    UUID failedNodeId() {
+        return failedNodeId;
+    }
+
+    /**
+     * @param failedNodeId Failed node ID (used when primary notifies near node).
+     */
+    void failedNodeId(UUID failedNodeId) {
+        assert failedNodeId != null;
+
+        this.failedNodeId = failedNodeId;
+
+        setFlag(true, DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
+    }
+
+    boolean primaryDhtFailureResponse() {
+        return isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
+    }
+
+    /**
      * @return Primary node ID.
      */
-    public UUID primaryId() {
+    UUID primaryId() {
         return primaryId;
     }
 
@@ -170,7 +202,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */
@@ -216,30 +248,36 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByte("flags", flags))
+                if (!writer.writeUuid("failedNodeId", failedNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeInt("partId", partId))
+                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 8:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
                 if (!writer.writeUuid("primaryId", primaryId))
                     return false;
 
@@ -270,7 +308,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                flags = reader.readByte("flags");
+                failedNodeId = reader.readUuid("failedNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -278,7 +316,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
-                futId = reader.readLong("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -286,7 +324,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 6:
-                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -294,7 +332,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 7:
-                partId = reader.readInt("partId");
+                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -302,6 +340,14 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 8:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
                 primaryId = reader.readUuid("primaryId");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index c6b3f5b..b431bd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -50,20 +50,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
 
     /**
      * @param cctx Cache context.
-     * @param completionCb Callback to invoke when future is completed.
      * @param writeVer Write version.
      * @param updateReq Update request.
      * @param updateRes Update response.
      */
     GridDhtAtomicSingleUpdateFuture(
         GridCacheContext cctx,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
         super(cctx,
-            completionCb,
             writeVer,
             updateReq,
             updateRes);

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 673fb5d..7303736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -50,23 +50,19 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
{
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
-
     /**
      * @param cctx Cache context.
-     * @param completionCb Callback to invoke when future is completed.
      * @param writeVer Write version.
      * @param updateReq Update request.
      * @param updateRes Update response.
      */
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
         super(cctx,
-            completionCb,
             writeVer,
             updateReq,
             updateRes);

http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 66d392b..258a0bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -442,6 +442,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onDhtResponse(GridCacheContext cctx, UUID nodeId, GridDhtAtomicNearResponse
res) {
+            if (res.primaryDhtFailureResponse()) {
+                assert res.mapping() != null : res;
+                assert res.failedNodeId() != null : res;
+
+                nodeId = res.failedNodeId();
+            }
+
             if (res.mapping() != null) {
                 // Mapping is sent from dht nodes.
                 if (mapping == null)


Mime
View raw message