ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: Fixed remap-keys problem.
Date Fri, 15 Apr 2016 14:23:49 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2926 4b78262c6 -> 9d862653a


Fixed remap-keys problem.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d862653
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d862653
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d862653

Branch: refs/heads/ignite-2926
Commit: 9d862653aa7a5c1844f0b783637634fb239053af
Parents: 4b78262
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Apr 15 17:22:59 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Apr 15 17:22:59 2016 +0300

----------------------------------------------------------------------
 .../GridNearAtomicSingleUpdateFuture.java       | 63 ++++++++++++++++----
 1 file changed, 51 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d862653/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 3917936..6547b0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.expiry.ExpiryPolicy;
@@ -73,6 +74,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /** Not null is operation is mapped to single node. */
     private GridNearAtomicUpdateRequest req;
 
+    /** Keys to remap. */
+    private Collection<KeyCacheObject> remapKeys;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -222,6 +226,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             this.req = null;
 
             if (res.remapKeys() != null) {
+                if (remapKeys == null)
+                    remapKeys = U.newHashSet(res.remapKeys().size());
+
+                remapKeys.addAll(res.remapKeys());
+
                 if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion())
< 0)
                     mapErrTopVer = req.topologyVersion();
             }
@@ -260,7 +269,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 }
             }
 
-            if (res.remapKeys() != null) {
+            if (remapKeys != null) {
                 assert mapErrTopVer != null;
 
                 remapTopVer = cctx.shared().exchange().topologyVersion();
@@ -285,6 +294,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                         err = null;
 
+                        Collection<Object> failedKeys = cause.failedKeys();
+
+                        remapKeys = new ArrayList<>(failedKeys.size());
+
+                        for (Object key : failedKeys)
+                            remapKeys.add(cctx.toCacheKeyObject(key));
+
                         updVer = null;
                     }
                 }
@@ -312,14 +328,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             return;
         }
 
-        if (nearEnabled && !nodeErr) {
-            if (res.remapKeys() != null || !req.hasPrimary())
-                return;
-
-            GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-            near.processNearAtomicUpdateResponse(req, res);
-        }
+        if (nearEnabled && !nodeErr)
+            updateNear(req, res);
 
         if (remapTopVer != null) {
             if (fut0 != null)
@@ -332,6 +342,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             }
 
             if (topLocked) {
+                assert !F.isEmpty(remapKeys) : remapKeys;
+
                 CachePartialUpdateCheckedException e =
                     new CachePartialUpdateCheckedException("Failed to update keys (retry
update if possible).");
 
@@ -340,7 +352,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
 
-                e.add(Collections.singleton(key), cause);
+                e.add(remapKeys, cause);
 
                 onDone(e);
 
@@ -360,7 +372,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                             try {
                                 AffinityTopologyVersion topVer = fut.get();
 
-                                map(topVer);
+                                map(topVer, remapKeys);
                             }
                             catch (IgniteCheckedException e) {
                                 onDone(e);
@@ -376,6 +388,23 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         onDone(opRes0, err0);
     }
 
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+        assert nearEnabled;
+
+        if (res.remapKeys() != null || !req.hasPrimary())
+            return;
+
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+        near.processNearAtomicUpdateResponse(req, res);
+    }
+
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
         cache.topology().readLock();
@@ -427,7 +456,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cache.topology().readUnlock();
         }
 
-        map(topVer);
+        map(topVer, null);
     }
 
     /**
@@ -480,6 +509,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     protected void map(AffinityTopologyVersion topVer) {
+        map(topVer, null);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param remapKeys Keys to remap.
+     */
+    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys)
{
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -524,6 +561,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 resCnt = 0;
 
                 req = singleReq0;
+
+                this.remapKeys = null;
             }
         }
         catch (Exception e) {


Mime
View raw message