ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Thu, 02 Mar 2017 08:20:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 9e93f19b8 -> 19c340ce2


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19c340ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19c340ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19c340ce

Branch: refs/heads/ignite-4705
Commit: 19c340ce21f013dce0155e93a6b7fe89adbd1def
Parents: 9e93f19
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 2 11:20:15 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Mar 2 11:20:15 2017 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 54 +++--------
 .../GridNearAtomicSingleUpdateFuture.java       | 91 +++++++++---------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 98 ++++++++++----------
 3 files changed, 103 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 9f7512c..204e510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -212,18 +212,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            Long futId = addAtomicFuture(topVer);
-
-            if (futId != null)
-                map(topVer, futId);
+            map(topVer);
         }
     }
 
     /**
      * @param topVer Topology version.
-     * @param futId Future ID.
      */
-    protected abstract void map(AffinityTopologyVersion topVer, Long futId);
+    protected abstract void map(AffinityTopologyVersion topVer);
 
     /**
      * Maps future on ready topology.
@@ -248,7 +244,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /**
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
-    protected boolean storeFuture() {
+    final boolean storeFuture() {
         return syncMode != FULL_ASYNC;
     }
 
@@ -258,7 +254,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+    final void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
@@ -318,43 +314,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param req Request.
      * @param e Error.
      */
-    protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException
e) {
-        synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                req.nodeId(),
-                req.futureId(),
-                cctx.deploymentEnabled());
-
-            res.addFailedKeys(req.keys(), e);
-
-            onPrimaryResponse(req.nodeId(), res, true);
-        }
-    }
+    final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException
e) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.nodeId(),
+            req.futureId(),
+            cctx.deploymentEnabled());
 
-    /**
-     * Adds future prevents topology change before operation complete.
-     * Should be invoked before topology lock released.
-     *
-     * @param topVer Topology version.
-     * @return Future ID in case future added.
-     */
-    final Long addAtomicFuture(AffinityTopologyVersion topVer) {
-        // TODO IGNITE-4705: it seems no need to add future inside read lock.
-
-        Long futId = cctx.mvcc().atomicFutureId();
-
-        synchronized (mux) {
-            assert this.futId == null : this;
-            assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
-            this.topVer = topVer;
-            this.futId = futId;
-        }
-
-        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
-            return null;
+        res.addFailedKeys(req.keys(), e);
 
-        return futId;
+        onPrimaryResponse(req.nodeId(), res, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 7a18328..b1b951f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -454,65 +454,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
-        // TODO IGNITE-4705: primary should block topology change, so it seems read lock
is not needed.
-        cache.topology().readLock();
-
         AffinityTopologyVersion topVer;
-        Long futId;
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache
is stopped): " +
-                    cache.name()));
-
-                return;
-            }
 
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is
stopped): " +
+                cache.name()));
 
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
-
-                if (err != null) {
-                    onDone(err);
+            return;
+        }
 
-                    return;
-                }
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                topVer = fut.topologyVersion();
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                futId = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
+
+            topVer = fut.topologyVersion();
         }
-        finally {
-            cache.topology().readUnlock();
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
+
+            return;
         }
 
-        if (futId != null)
-            map(topVer, futId);
+        map(topVer);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        Long futId = cctx.mvcc().atomicFutureId();
+
         Exception err = null;
         GridNearAtomicAbstractUpdateRequest singleReq0 = null;
 
@@ -520,11 +510,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             singleReq0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId.equals(futId) || (this.isDone() && this.error()
!= null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+                this.futId = futId;
 
                 reqState = new PrimaryRequestState(singleReq0);
             }
+
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
+
+                return;
+            }
         }
         catch (Exception e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 89b2573..573cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -687,59 +687,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override protected void mapOnTopology() {
         AffinityTopologyVersion topVer;
 
-        Long futId;
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is
stopped): " +
+                cache.name()));
 
-        cache.topology().readLock();
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache
is stopped): " +
-                    cache.name()));
-
-                return;
-            }
-
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
-
-                if (err != null) {
-                    onDone(err);
+            return;
+        }
 
-                    return;
-                }
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                topVer = fut.topologyVersion();
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                futId = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
+
+            topVer = fut.topologyVersion();
         }
-        finally {
-            cache.topology().readUnlock();
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
+
+            return;
         }
 
-        if (futId != null)
-            map(topVer, futId, remapKeys);
+        map(topVer, remapKeys);
     }
 
     /**
@@ -799,18 +787,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
-        map(topVer, futId, null);
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        map(topVer, null);
     }
 
     /**
      * @param topVer Topology version.
-     * @param futId Future ID.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer,
-        Long futId,
-        @Nullable Collection<KeyCacheObject> remapKeys) {
+    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys)
{
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -820,6 +805,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
+        Long futId = cctx.mvcc().atomicFutureId();
+
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
         Map<UUID, PrimaryRequestState> mappings0 = null;
@@ -848,8 +835,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId.equals(futId) || (this.isDone() && this.error()
!= null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+                this.futId = futId;
 
                 resCnt = 0;
 
@@ -858,6 +848,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 this.remapKeys = null;
             }
+
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
+
+                return;
+            }
         }
         catch (Exception e) {
             err = e;


Mime
View raw message