ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/6] ignite git commit: Reworked "remapKeys" logic.
Date Fri, 15 Apr 2016 11:34:32 GMT
Reworked "remapKeys" logic.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfae05a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfae05a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfae05a6

Branch: refs/heads/ignite-2926
Commit: dfae05a6bae2e2ef9d02fc1bb37f6a7208a338f3
Parents: 61bdf4c
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Apr 15 12:42:40 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Apr 15 12:42:40 2016 +0300

----------------------------------------------------------------------
 .../GridNearAtomicSingleUpdateFuture.java       | 127 ++++++++-----------
 1 file changed, 55 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dfae05a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6c1a402..fc85db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -77,7 +77,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
     private Map<UUID, GridNearAtomicUpdateRequest> mappings;
 
     /** Keys to remap. */
-    private Collection<KeyCacheObject> remapKeys;
+    private boolean remapKey;
 
     /** Not null is operation is mapped to single node. */
     private GridNearAtomicUpdateRequest singleReq;
@@ -189,7 +189,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             // Cannot remap.
             remapCnt = 1;
 
-            map(topVer, null);
+            map(topVer);
         }
     }
 
@@ -299,10 +299,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             if (res.remapKeys() != null) {
                 assert !fastMap || cctx.kernalContext().clientNode();
 
-                if (remapKeys == null)
-                    remapKeys = U.newHashSet(res.remapKeys().size());
-
-                remapKeys.addAll(res.remapKeys());
+                remapKey = true;
 
                 if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion())
< 0)
                     mapErrTopVer = req.topologyVersion();
@@ -343,7 +340,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             }
 
             if (rcvAll) {
-                if (remapKeys != null) {
+                if (remapKey) {
                     assert mapErrTopVer != null;
 
                     remapTopVer = cctx.shared().exchange().topologyVersion();
@@ -368,12 +365,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
 
                             err = null;
 
-                            Collection<Object> failedKeys = cause.failedKeys();
-
-                            remapKeys = new ArrayList<>(failedKeys.size());
-
-                            for (Object key : failedKeys)
-                                remapKeys.add(cctx.toCacheKeyObject(key));
+                            remapKey = true;
 
                             updVer = null;
                         }
@@ -428,7 +420,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             }
 
             if (topLocked) {
-                assert !F.isEmpty(remapKeys) : remapKeys;
+                assert remapKey;
 
                 CachePartialUpdateCheckedException e =
                     new CachePartialUpdateCheckedException("Failed to update keys (retry
update if possible).");
@@ -438,7 +430,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
 
                 cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
 
-                e.add(remapKeys, cause);
+                e.add(Collections.singleton(keys), cause);
 
                 onDone(e);
 
@@ -458,7 +450,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
                             try {
                                 AffinityTopologyVersion topVer = fut.get();
 
-                                map(topVer, remapKeys);
+                                map(topVer);
                             }
                             catch (IgniteCheckedException e) {
                                 onDone(e);
@@ -545,7 +537,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             cache.topology().readUnlock();
         }
 
-        map(topVer, null);
+        map(topVer);
     }
 
     /**
@@ -642,9 +634,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
 
     /**
      * @param topVer Topology version.
-     * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys)
{
+    void map(AffinityTopologyVersion topVer) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -677,17 +668,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
             updVer = null;
 
         try {
-            if (!fastMap) {
-                assert remapKeys == null || remapKeys.size() == 1;
-
+            if (!fastMap)
                 singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
-            }
             else {
                 Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futVer,
-                    updVer,
-                    remapKeys);
+                    updVer);
 
                 if (pendingMappings.size() == 1)
                     singleReq0 = F.firstValue(pendingMappings);
@@ -720,7 +707,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
                 singleReq = singleReq0;
                 mappings = mappings0;
 
-                this.remapKeys = null;
+                this.remapKey = false;
             }
         }
         catch (Exception e) {
@@ -780,15 +767,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
      * @param topVer Topology version.
      * @param futVer Future version.
      * @param updVer Update version.
-     * @param remapKeys Keys to remap.
      * @return Mapping.
      * @throws Exception If failed.
      */
     private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode>
topNodes,
         AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer,
-        @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+        @Nullable GridCacheVersion updVer) throws Exception {
         Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
 
         if (keys == null)
@@ -801,57 +786,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda
 
         KeyCacheObject cacheKey = cctx.toCacheKeyObject(keys);
 
-        if (remapKeys == null || remapKeys.contains(cacheKey)) {
-            if (op != TRANSFORM)
-                val = cctx.toCacheObject(val);
+        if (op != TRANSFORM)
+            val = cctx.toCacheObject(val);
+
+        Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+
+        if (affNodes.isEmpty())
+            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache
" +
+                "(all partition nodes left the grid).");
 
-            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+        int i = 0;
 
-            if (affNodes.isEmpty())
+        for (ClusterNode affNode : affNodes) {
+            if (affNode == null)
                 throw new ClusterTopologyServerNotFoundException("Failed to map keys for
cache " +
                     "(all partition nodes left the grid).");
 
-            int i = 0;
-
-            for (ClusterNode affNode : affNodes) {
-                if (affNode == null)
-                    throw new ClusterTopologyServerNotFoundException("Failed to map keys
for cache " +
-                        "(all partition nodes left the grid).");
-
-                UUID nodeId = affNode.id();
-
-                GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
-                if (mapped == null) {
-                    mapped = new GridNearAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futVer,
-                        fastMap,
-                        updVer,
-                        topVer,
-                        topLocked,
-                        syncMode,
-                        op,
-                        retval,
-                        expiryPlc,
-                        invokeArgs,
-                        filter,
-                        subjId,
-                        taskNameHash,
-                        skipStore,
-                        keepBinary,
-                        cctx.kernalContext().clientNode(),
-                        cctx.deploymentEnabled(),
-                        1);
-
-                    pendingMappings.put(nodeId, mapped);
-                }
+            UUID nodeId = affNode.id();
 
-                mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE,
null, i == 0);
+            GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
 
-                i++;
+            if (mapped == null) {
+                mapped = new GridNearAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    fastMap,
+                    updVer,
+                    topVer,
+                    topLocked,
+                    syncMode,
+                    op,
+                    retval,
+                    expiryPlc,
+                    invokeArgs,
+                    filter,
+                    subjId,
+                    taskNameHash,
+                    skipStore,
+                    keepBinary,
+                    cctx.kernalContext().clientNode(),
+                    cctx.deploymentEnabled(),
+                    1);
+
+                pendingMappings.put(nodeId, mapped);
             }
+
+            mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE,
null, i == 0);
+
+            i++;
         }
 
         return pendingMappings;


Mime
View raw message