ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: # ignite-1124 WIP
Date Wed, 26 Aug 2015 05:13:53 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1124 e3108238e -> 975cc46ef


# ignite-1124 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/975cc46e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/975cc46e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/975cc46e

Branch: refs/heads/ignite-1124
Commit: 975cc46ef5472f2e5452c04757af5dd29a281eb1
Parents: e310823
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Aug 25 21:09:57 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 26 08:11:56 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 94 +++++++++++++-------
 ...acheAtomicReplicatedNodeRestartSelfTest.java |  5 --
 2 files changed, 60 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/975cc46e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 0014f86..8a2f073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -202,7 +202,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (singleReq != null)
                     req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
                 else
-                    req = mappings.get(nodeId);
+                    req = mappings != null ? mappings.get(nodeId) : null;
 
                 if (req != null) {
                     res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
@@ -213,7 +213,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             if (res != null)
-                onResult(nodeId, res);
+                onResult(nodeId, res, true);
         }
 
         /**
@@ -236,7 +236,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
          * @param nodeId Node ID.
          * @param res Response.
          */
-        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
             GridNearAtomicUpdateRequest req;
 
             AffinityTopologyVersion errTopVer = null;
@@ -246,6 +246,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             boolean rcvAll;
 
+            GridFutureAdapter<?> fut0 = null;
+
             synchronized (this) {
                 if (!res.futureVersion().equals(futVer))
                     return;
@@ -256,25 +258,26 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                     req = singleReq;
 
+                    singleReq = null;
+
                     rcvAll = true;
                 }
                 else {
-                    req = mappings.remove(nodeId);
+                    req = mappings != null ? mappings.remove(nodeId) : null;
 
-                    if (req != null) {
+                    if (req != null)
                         rcvAll = mappings.isEmpty();
-
-                        topVer = req.topologyVersion();
-                    }
                     else
                         return;
                 }
 
+                assert req != null && req.topologyVersion().equals(topVer) : req;
+
                 if (res.remapKeys() != null) {
                     assert !fastMap || cctx.kernalContext().clientNode();
 
                     if (remapKeys == null)
-                        remapKeys = new ArrayList<>(res.remapKeys().size());
+                        remapKeys = U.newHashSet(res.remapKeys().size());
 
                     remapKeys.addAll(res.remapKeys());
 
@@ -324,6 +327,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                                 Collection<Object> failedKeys = cause.failedKeys();
 
+                                remapKeys = new ArrayList<>(failedKeys.size());
+
                                 for (Object key : failedKeys)
                                     remapKeys.add(cctx.toCacheKeyObject(key));
 
@@ -337,44 +342,53 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         opRes0 = opRes;
                     }
                     else {
+                        fut0 = topCompleteFut;
+
+                        topCompleteFut = null;
+
                         cctx.mvcc().removeAtomicFuture(futVer);
 
                         futVer = null;
-                        singleReq = null;
                         topVer = AffinityTopologyVersion.ZERO;
                     }
                 }
             }
 
-            updateNear(req, res);
+            if (!nodeErr && res.remapKeys() == null)
+                updateNear(req, res);
 
             if (errTopVer != null) {
-                IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion()
+ 1);
+                if (fut0 != null)
+                    fut0.onDone();
 
-                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(final IgniteInternalFuture<?> fut)
{
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                            @Override public void run() {
-                                try {
-                                    fut.get();
+                if (errTopVer == AffinityTopologyVersion.NONE)
+                    mapOnTopology();
+                else {
+                    IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion()
+ 1);
 
-                                    mapOnTopology();
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onDone(e);
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(final IgniteInternalFuture<?> fut)
{
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    try {
+                                        fut.get();
+
+                                        mapOnTopology();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        onDone(e);
+                                    }
                                 }
-                            }
-                        });
-                    }
-                });
+                            });
+                        }
+                    });
+                }
 
                 return;
             }
 
-            if (err0 != null)
-                onDone(err0);
-            else if (rcvAll)
-                onDone(opRes0);
+            if (rcvAll)
+                onDone(opRes0, err0);
         }
 
         /**
@@ -389,7 +403,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 res.addFailedKeys(req.keys(), e);
 
-                onResult(req.nodeId(), res);
+                onResult(req.nodeId(), res, true);
             }
         }
 
@@ -622,9 +636,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             Exception err = null;
             Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
 
-            int size = keys().size();
+            int size = keys.size();
 
             synchronized (this) {
+                assert futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
                 this.topVer = topVer;
 
                 futVer = cctx.versions().next(topVer);
@@ -632,6 +649,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (storeFuture())
                     cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
 
+                // Assign version on near node in CLOCK ordering mode even if fastMap is
false.
+                if (updVer == null)
+                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer)
: null;
+
+                if (updVer != null && log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+
                 try {
                     if (size == 1 && !fastMap) {
                         assert remapKeys == null || remapKeys.size() == 1;
@@ -653,7 +677,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                                 }
                             }
                             else
-                                mappings = pendingMappings;
+                                mappings = new HashMap<>(pendingMappings);
 
                             assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
                         }
@@ -714,6 +738,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             synchronized (this) {
                 fut0 = topCompleteFut;
 
+                topCompleteFut = null;
+
                 ver0 = futVer;
 
                 futVer = null;
@@ -930,7 +956,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param res Update response.
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
-        state.onResult(nodeId, res);
+        state.onResult(nodeId, res, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/975cc46e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
index 68c7fbb..f556023 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
@@ -26,11 +26,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
  */
 public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest
{
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1124");
-    }
-
-    /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
         return ATOMIC;
     }


Mime
View raw message