ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Wed, 01 Mar 2017 14:43:17 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 9c63b4fe7 -> 8baff9a6b


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8baff9a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8baff9a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8baff9a6

Branch: refs/heads/ignite-4705
Commit: 8baff9a6b1eeed74a6aec33c3063a4bb0210d2ba
Parents: 9c63b4f
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Mar 1 13:02:06 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Mar 1 17:41:46 2017 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |   8 ++
 .../dht/GridDhtPartitionTopology.java           |   3 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  15 ++-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  48 +++++++--
 .../GridDhtAtomicAbstractUpdateRequest.java     |  10 ++
 .../dht/atomic/GridDhtAtomicCache.java          |  31 +++---
 .../dht/atomic/GridDhtAtomicNearResponse.java   |   9 ++
 .../GridNearAtomicAbstractUpdateFuture.java     | 102 +++++++++++++------
 .../atomic/GridNearAtomicFullUpdateRequest.java |   2 +
 .../atomic/GridNearAtomicMappingResponse.java   |  65 ++++++++++--
 .../GridNearAtomicSingleUpdateRequest.java      |   2 +
 .../GridCacheAbstractFailoverSelfTest.java      |   3 +-
 ...niteBinaryMetadataUpdateNodeRestartTest.java |   2 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |   2 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |  48 +++++++++
 15 files changed, 286 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index a1fbd72..91ca1b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -403,6 +404,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer)
{
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index bdd84b0..351ab37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -161,6 +162,8 @@ public interface GridDhtPartitionTopology {
      */
     public long updateSequence();
 
+    @Nullable List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode>
affNodes);
+
     /**
      * @param p Partition ID.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 966a186..c2efda6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -820,11 +820,24 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        return nodes0(p, affAssignment, affNodes);
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer)
{
         AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
+        List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
+
+        return nodes != null ? nodes : affNodes;
+    }
+
+    @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment,
List<ClusterNode> affNodes) {
         lock.readLock().lock();
 
         try {
@@ -858,7 +871,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                 }
             }
 
-            return nodes != null ? nodes : affNodes;
+            return nodes != null ? nodes : null;
         }
         finally {
             lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 0877d24..094d643 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
@@ -95,6 +96,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** */
     private boolean repliedToNear;
 
+    /** */
+    private boolean affMapping = true;
+
     /**
      * @param cctx Cache context.
      * @param writeVer Write version.
@@ -154,6 +158,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     final void addWriteEntry(
+        AffinityAssignment affAssignment,
         UUID nearNodeId,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
@@ -166,7 +171,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(),
topVer);
+        List<ClusterNode> affNodes = affAssignment.get(entry.partition());
+
+        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(),
affAssignment, affNodes);
+
+        if (dhtNodes != null)
+            affMapping = false;
+        else
+            dhtNodes = affNodes;
 
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry="
+ entry + ']');
@@ -240,6 +252,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
+        affMapping = false;
+
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
 
         addNearKey(entry.key(), readers);
@@ -415,21 +429,28 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         List<UUID> dhtNodes = null;
 
+        boolean affMapping = false;
+
         if (fullSync) {
             if (!F.isEmpty(mappings)) {
-                dhtNodes = new ArrayList<>(mappings.size());
+                if (updateReq.size() == 1 && this.affMapping)
+                    affMapping = true;
+                else {
+                    dhtNodes = new ArrayList<>(mappings.size());
 
-                dhtNodes.addAll(mappings.keySet());
+                    dhtNodes.addAll(mappings.keySet());
+                }
             }
             else
                 dhtNodes = Collections.emptyList();
-
-            if (needReplyToNear)
-                updateRes.mapping(dhtNodes);
         }
+        else
+            dhtNodes = Collections.emptyList();
+
+        updateRes.mapping(dhtNodes);
 
         if (!F.isEmpty(mappings)) {
-            sendDhtRequests(fullSync && !needReplyToNear, dhtNodes, ret);
+            sendDhtRequests(fullSync && !needReplyToNear, dhtNodes, affMapping, ret);
 
             if (needReplyToNear)
                 completionCb.apply(updateReq, updateRes);
@@ -439,7 +460,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                         cctx.cacheId(),
                         updateReq.partition(),
                         updateReq.futureId(),
-                        dhtNodes);
+                        dhtNodes,
+                        affMapping);
 
                     try {
                         cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
@@ -464,11 +486,17 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param dhtNodes DHT nodes.
      * @param ret Return value.
      */
-    private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, GridCacheReturn
ret) {
+    private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, boolean
affMapping, GridCacheReturn ret) {
         for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
             try {
                 if (nearReplyInfo) {
-                    req.dhtNodes(dhtNodes);
+                    if (affMapping) {
+                        assert dhtNodes == null;
+
+                        req.affinityMapping(true);
+                    }
+                    else
+                        req.dhtNodes(dhtNodes);
 
                     if (!ret.hasValue())
                         req.setResult(ret.success());

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 1d01a49..896c163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -61,6 +61,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** */
     static final int DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE = 0x20;
 
+    /** */
+    static final int DHT_ATOMIC_AFF_MAPPING_FLAG_MASK = 0x40;
+
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
@@ -151,6 +154,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     }
 
     /**
+     * @param affMapping
+     */
+    public void affinityMapping(boolean affMapping) {
+        setFlag(affMapping, DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
+    }
+
+    /**
      * @param success Success flag.
      */
     public void setResult(boolean success) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9c21615..c6a6a15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -39,6 +39,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@ -1744,6 +1745,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridNearAtomicAbstractUpdateRequest req,
         UpdateReplyClosure completionCb
     ) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            U.warn(msgLog, "Skip near update request, node originated update request left
[" +
+                "futId=" + req.futureId() + ", node=" + nodeId + ']');
+
+            return;
+        }
+
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureId(),
             ctx.deploymentEnabled());
 
@@ -1784,15 +1794,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     // Do not check topology version if topology was locked on near node
by
                     // external transaction or explicit lock.
                     if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion()))
{
-                        ClusterNode node = ctx.discovery().node(nodeId);
-
-                        if (node == null) {
-                            U.warn(msgLog, "Skip near update request, node originated update
request left [" +
-                                "futId=" + req.futureId() + ", node=" + nodeId + ']');
-
-                            return;
-                        }
-
                         locked = lockEntries(req, req.topologyVersion());
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
@@ -1866,9 +1867,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         res.returnValue(retVal);
 
-                        if (req.writeSynchronizationMode() != FULL_ASYNC)
-                            req.cleanup(!node.isLocal());
-
                         if (dhtFut != null) {
                             dhtFut.init(res, res.returnValue());
 
@@ -1945,6 +1943,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 completionCb.apply(req, res);
         }
 
+        if (req.writeSynchronizationMode() != FULL_ASYNC)
+            req.cleanup(!node.isLocal());
+
         sendTtlUpdateRequest(expiry);
     }
 
@@ -2408,6 +2409,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
         // Avoid iterator creation.
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject k = req.key(i);
@@ -2484,6 +2487,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(
+                                affAssignment,
                                 nearNode.id(),
                                 entry,
                                 updRes.newValue(),
@@ -2684,6 +2688,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
+            AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2782,6 +2788,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         if (!batchRes.readersOnly()) {
                             dhtFut.addWriteEntry(
+                                affAssignment,
                                 nearNode.id(),
                                 entry,
                                 writeVal,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 2e9b616..b397f0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -34,6 +34,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_AFF_MAPPING_FLAG_MASK;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
@@ -105,6 +106,13 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     }
 
     /**
+     * @return {@code True} if update mapping matches affinity function result.
+     */
+    boolean affinityMapping() {
+        return isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
+    }
+
+    /**
      * @return Errors.
      */
     @Nullable UpdateErrors errors() {
@@ -389,6 +397,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
         return S.toString(GridDhtAtomicNearResponse.class, this, "flags",
             "res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK) +
             "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK) +
+            "|affMap=" + isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK) +
             "|dhtFail=" + isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index d73f029..9f7512c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -27,6 +27,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
@@ -389,20 +390,54 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             return mapping != null && mapping.isEmpty() && hasRes;
         }
 
+        void initAffinityMapping(GridCacheContext cctx, UUID skipNodeId) {
+            assert req.size() == 1 : req;
+
+            List<ClusterNode> nodes =
+                cctx.affinity().nodesByPartition(req.key(0).partition(), req.topologyVersion());
+
+            if (nodes.size() == 1)
+                mapping = Collections.emptySet();
+            else {
+                for (int i = 1; i < nodes.size(); i++) {
+                    ClusterNode dhtNode = nodes.get(i);
+
+                    if (dhtNode.id().equals(skipNodeId) || (rcvd != null && rcvd.contains(dhtNode.id())))
+                        continue;
+
+                    if (cctx.discovery().node(dhtNode.id()) != null) {
+                        if (mapping == null)
+                            mapping = U.newHashSet(nodes.size() - 1);
+
+                        mapping.add(dhtNode.id());
+                    }
+                }
+
+                if (mapping == null)
+                    mapping = Collections.emptySet();
+            }
+        }
+
         /**
          * @param cctx Context.
          * @param nodeIds DHT nodes.
+         * @param skipNodeId Node ID to skip.
          */
-        void initMapping(GridCacheContext cctx, List<UUID> nodeIds) {
-            mapping = U.newHashSet(nodeIds.size());
-
+        void initMapping(GridCacheContext cctx, List<UUID> nodeIds, @Nullable UUID
skipNodeId) {
             for (UUID dhtNodeId : nodeIds) {
-                if (cctx.discovery().node(dhtNodeId) != null)
+                if (dhtNodeId.equals(skipNodeId) || (rcvd != null && rcvd.contains(dhtNodeId)))
+                    continue;
+
+                if (cctx.discovery().node(dhtNodeId) != null) {
+                    if (mapping == null)
+                        mapping = U.newHashSet(nodeIds.size());
+
                     mapping.add(dhtNodeId);
+                }
             }
 
-            if (rcvd != null)
-                mapping.removeAll(rcvd);
+            if (mapping == null)
+                mapping = Collections.emptySet();
         }
 
         /**
@@ -425,17 +460,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onMappingReceived(GridCacheContext cctx, GridNearAtomicMappingResponse res)
{
-            if (finished())
+            if (finished() || mapping != null)
                 return false;
 
-            if (mapping == null) {
-                initMapping(cctx, res.mapping());
-
-                if (mapping.isEmpty() && hasRes)
-                    return true;
-            }
+            if (res.affinityMapping())
+                initAffinityMapping(cctx, null);
+            else
+                initMapping(cctx, res.mapping(), null);
 
-            return false;
+            return finished();
         }
 
         /**
@@ -446,10 +479,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return false;
 
-            if (mapping != null && mapping.remove(nodeId)) {
-                if (mapping.isEmpty() && hasRes)
-                    return true;
-            }
+            if (mapping != null && mapping.remove(nodeId))
+                return finished();
 
             return false;
         }
@@ -473,10 +504,22 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 nodeId = res.failedNodeId();
             }
 
-            if (res.mapping() != null) {
+            if (res.hasResult())
+                hasRes = true;
+
+            if (res.affinityMapping()) {
+                if (mapping == null) {
+                    initAffinityMapping(cctx, nodeId);
+
+                    return finished();
+                }
+            } else if (res.mapping() != null) {
                 // Mapping is sent from dht nodes.
-                if (mapping == null)
-                    initMapping(cctx, res.mapping());
+                if (mapping == null) {
+                    initMapping(cctx, res.mapping(), nodeId);
+
+                    return finished();
+                }
             }
             else {
                 // Mapping and result are sent from primary.
@@ -490,12 +533,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 }
             }
 
-            mapping.remove(nodeId);
-
-            if (res.hasResult())
-                hasRes = true;
-
-            return mapping.isEmpty() && hasRes;
+            return mapping.remove(nodeId) && finished();
         }
 
         /**
@@ -517,12 +555,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             assert res.returnValue() != null : res;
 
-            if (res.mapping() != null)
-                initMapping(cctx, res.mapping());
+            if (res.mapping() != null) {
+                if (mapping == null)
+                    initMapping(cctx, res.mapping(), null);
+            }
             else
-                mapping = Collections.emptySet();
+                initAffinityMapping(cctx, null);
 
-            return mapping.isEmpty();
+            return finished();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index a8f5a55..869cfbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -405,6 +405,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int size() {
+        assert keys != null;
+
         return keys != null ? keys.size() : 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
index 6dd33ea..855fb78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
@@ -38,6 +38,9 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
     public static final int CACHE_MSG_IDX = nextIndexId();
 
     /** */
+    private static final int AFF_MAPPING_FLAG_MASK = 0x01;
+
+    /** */
     private int part;
 
     /** */
@@ -47,6 +50,9 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
     /** */
     private long futId;
 
+    /** */
+    private byte flags;
+
     /**
      *
      */
@@ -59,12 +65,45 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
      * @param part Partition.
      * @param futId Future ID.
      * @param mapping Mapping.
+     * @param affMapping {@code True} if update mapping matches affinity function result.
      */
-    GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping)
{
+    GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping,
boolean affMapping) {
+        assert mapping == null || !affMapping;
+
         this.cacheId = cacheId;
         this.part = part;
         this.futId = futId;
         this.mapping = mapping;
+
+        if (affMapping)
+            setFlag(true, AFF_MAPPING_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if update mapping matches affinity function result.
+     */
+    boolean affinityMapping() {
+        return isFlag(AFF_MAPPING_FLAG_MASK);
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
     }
 
     /** {@inheritDoc} */
@@ -98,7 +137,7 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /** {@inheritDoc} */
@@ -122,18 +161,24 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage
{
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
+                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeInt("part", part))
                     return false;
 
@@ -156,7 +201,7 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readLong("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -164,7 +209,7 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -172,6 +217,14 @@ public class GridNearAtomicMappingResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
+                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 part = reader.readInt("part");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 0045228..577b130 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -164,6 +164,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
     /** {@inheritDoc} */
     @Override public int size() {
+        assert key != null;
+
         return key == null ? 0 : 1;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index da54d15..0b2e717 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -78,8 +78,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
 
         cfg.setNetworkTimeout(60_000);
 
-        cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
-
         TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
 
         discoSpi.setSocketTimeout(30_000);
@@ -89,6 +87,7 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
         discoSpi.setReconnectCount(2);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(100);
 
         return cfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index ac3f0d0..0160269 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -75,7 +75,7 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
-        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(500);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(100);
 
         cfg.setMarshaller(null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index a751e85..8a100dd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -120,7 +120,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
-        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(1000);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(100);
 
         AtomicConfiguration acfg = new AtomicConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8baff9a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index df1938b..8aeb903 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -29,6 +30,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
@@ -41,6 +43,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -281,6 +284,51 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPutPrimarySync() throws Exception {
+        startGrids(2);
+
+        client = true;
+
+        Ignite clientNode = startGrid(2);
+
+        client = false;
+
+        final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1,
PRIMARY_SYNC));
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        awaitPartitionMapExchange();
+
+        Ignite srv0 = grid(0);
+        final Ignite srv1 = grid(1);
+
+        final Integer key = primaryKey(srv0.cache(TEST_CACHE));
+
+        testSpi(srv0).blockMessages(GridDhtAtomicSingleUpdateRequest.class, srv1.name());
+
+        nearAsyncCache.put(key, key);
+
+        IgniteFuture<?> fut = nearAsyncCache.future();
+
+        fut.get(5, TimeUnit.SECONDS);
+
+        assertEquals(key, srv0.cache(TEST_CACHE).get(key));
+
+        assertNull(srv1.cache(TEST_CACHE).localPeek(key));
+
+        testSpi(srv0).stopBlock(true);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srv1.cache(TEST_CACHE).localPeek(key) != null;
+            }
+        }, 5000);
+
+        checkData(F.asMap(key, key));
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {


Mime
View raw message