ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4984
Date Wed, 19 Apr 2017 18:58:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4984 b08670381 -> 14582779f


ignite-4984


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14582779
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14582779
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14582779

Branch: refs/heads/ignite-4984
Commit: 14582779f3fddeeae63ef3dd10f9c11e85f931d6
Parents: b086703
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Apr 19 21:58:06 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Apr 19 21:58:06 2017 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 50 ++++++++++-
 .../GridNearAtomicSingleUpdateFuture.java       | 65 ++-------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 87 +++++++-------------
 .../atomic/IgniteCacheAtomicProtocolTest.java   |  4 +-
 4 files changed, 82 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/14582779/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 480a866..9d0d345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -55,6 +55,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * Base for near atomic update futures.
@@ -136,7 +137,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
 
     /** Future ID. */
     @GridToStringInclude
-    protected volatile long futId;
+    protected long futId;
 
     /** Operation result. */
     protected GridCacheReturn opRes;
@@ -159,7 +160,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
      * @param keepBinary Keep binary flag.
      * @param recovery {@code True} if cache operation is called in recovery mode.
      * @param remapCnt Remap count.
-     * @param waitTopFut Wait topology future flag.
      */
     protected GridNearAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
@@ -227,7 +227,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
             onSendError(req, e);
         }
         catch (IgniteCheckedException e) {
-            onDone(e);
+            completeFuture(null, e, req.futureId());
         }
     }
 
@@ -336,6 +336,50 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
     public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
 
     /**
+     * @param ret Result.
+     * @param err Error.
+     * @param futId Not null ID if need remove future.
+     */
+    final void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullable Long
futId) {
+        Object retval = ret == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM)
?
+                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+        if (op == TRANSFORM && retval == null)
+            retval = Collections.emptyMap();
+
+        if (futId != null)
+            cctx.mvcc().removeAtomicFuture(futId);
+
+        super.onDone(retval, err);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public final boolean onDone(@Nullable Object res, @Nullable Throwable err)
{
+        assert err != null;
+
+        Long futId = null;
+
+        synchronized (this) {
+            if (futureMapped()) {
+                futId = this.futId;
+
+                topVer = AffinityTopologyVersion.ZERO;
+                this.futId = 0;
+            }
+        }
+
+        if (super.onDone(null, err)) {
+            if (futId != null)
+                cctx.mvcc().removeAtomicFuture(futId);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param req Request.
      * @param res Response.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/14582779/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 0aeb578..6ffa373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -133,12 +133,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         boolean rcvAll = false;
 
-        long futId = 0;
+        long futId;
 
         synchronized (this) {
             if (!futureMapped())
                 return false;
 
+            futId = this.futId;
+
             if (reqState.req.nodeId.equals(nodeId)) {
                 GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
 
@@ -167,7 +169,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 opRes0 = opRes;
                 err0 = err;
                 remapTopVer0 = onAllReceived();
-                futId = this.futId;
             }
         }
 
@@ -179,49 +180,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         return false;
     }
 
-    /**
-     * @param ret Result.
-     * @param err Error.
-     * @param futId Not null ID if need remove future.
-     */
-    private void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullable Long
futId) {
-        Object retval = ret == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM)
?
-            cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
-
-        if (op == TRANSFORM && retval == null)
-            retval = Collections.emptyMap();
-
-        if (futId != null)
-            cctx.mvcc().removeAtomicFuture(futId);
-
-        super.onDone(retval, err);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
-        assert err != null;
-
-        Long futId = null;
-
-        synchronized (this) {
-            if (futureMapped()) {
-                futId = this.futId;
-
-                topVer = AffinityTopologyVersion.ZERO;
-            }
-        }
-
-        if (super.onDone(null, err)) {
-            if (futId != null)
-                cctx.mvcc().removeAtomicFuture(futId);
-
-            return true;
-        }
-
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0;
@@ -377,6 +335,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
             reqState = null;
             topVer = AffinityTopologyVersion.ZERO;
+            futId = 0;
 
             remapTopVer = null;
         }
@@ -492,6 +451,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (this) {
+                assert topVer.topologyVersion() > 0 : topVer;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -565,21 +525,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /**
-     * @return Future ID.
-     */
-    private Long onFutureDone() {
-        Long id0;
-
-        synchronized (this) {
-            id0 = futId;
-
-            futId = 0;
-        }
-
-        return id0;
-    }
-
-    /**
      * @param topVer Topology version.
      * @param futId Future ID.
      * @return Request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/14582779/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 6ebb1de..581e235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -170,10 +170,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
+        long futId;
+
         synchronized (this) {
             if (!futureMapped())
                 return false;
 
+            futId = this.futId;
+
             if (singleReq != null) {
                 if (singleReq.req.nodeId.equals(nodeId)) {
                     GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail();
@@ -266,33 +270,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 sendCheckUpdateRequest(checkReqs.get(i));
         }
         else if (rcvAll)
-            finishUpdateFuture(opRes0, err0, remapTopVer0);
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
-        assert res == null || res instanceof GridCacheReturn;
-
-        GridCacheReturn ret = (GridCacheReturn)res;
-
-        Object retval =
-            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
-                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
-
-        if (op == TRANSFORM && retval == null)
-            retval = Collections.emptyMap();
-
-        if (super.onDone(retval, err)) {
-            Long futId = onFutureDone();
-
-            if (futId != null)
-                cctx.mvcc().removeAtomicFuture(futId);
-
-            return true;
-        }
+            finishUpdateFuture(opRes0, err0, remapTopVer0, futId);
 
         return false;
     }
@@ -356,12 +334,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (errors != null) {
             assert errors.error() != null;
 
-            onDone(errors.error());
+            completeFuture(null, errors.error(), res.futureId());
 
             return;
         }
 
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
+        finishUpdateFuture(opRes0, err0, remapTopVer0, res.futureId());
     }
 
     /** {@inheritDoc} */
@@ -462,7 +440,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (res.error() != null && res.failedKeys() == null) {
-            onDone(res.error());
+            completeFuture(null, res.error(), res.futureId());
 
             return;
         }
@@ -488,7 +466,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (rcvAll)
-            onDone(opRes0, err0);
+            completeFuture(opRes0, err0, res.futureId());
     }
 
     private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
@@ -507,7 +485,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
             e.add(remapKeys, cause);
 
-            onDone(e);
+            completeFuture(null, e, null);
 
             return;
         }
@@ -577,6 +555,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             cctx.mvcc().removeAtomicFuture(futId);
 
             topVer = AffinityTopologyVersion.ZERO;
+            futId = 0;
 
             remapTopVer = null;
         }
@@ -587,10 +566,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /**
      * @param opRes Operation result.
      * @param err Operation error.
+     * @param futId Future ID.
      */
     private void finishUpdateFuture(GridCacheReturn opRes,
         CachePartialUpdateCheckedException err,
-        @Nullable AffinityTopologyVersion remapTopVer) {
+        @Nullable AffinityTopologyVersion remapTopVer,
+        long futId) {
         if (nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
@@ -616,7 +597,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        onDone(opRes, err);
+        completeFuture(opRes, err, futId);
     }
 
     /**
@@ -641,8 +622,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
-            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is
stopped): " +
-                cache.name()));
+            completeFuture(null,
+                new IgniteCheckedException("Failed to perform cache operation (cache is stopped):
" + cache.name()),
+                null);
 
             return;
         }
@@ -653,7 +635,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             Throwable err = fut.validateCache(cctx, recovery, false, null, keys);
 
             if (err != null) {
-                onDone(err);
+                completeFuture(null, err, null);
 
                 return;
             }
@@ -739,7 +721,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (syncMode == FULL_ASYNC)
-            onDone(new GridCacheReturn(cctx, true, true, null, true));
+            completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, null);
     }
 
     /** {@inheritDoc} */
@@ -755,8 +737,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache
(all partition nodes " +
-                "left the grid)."));
+            completeFuture(null,
+                new ClusterTopologyServerNotFoundException("Failed to map keys for cache
(all partition nodes left the grid)."),
+                null);
 
             return;
         }
@@ -795,6 +778,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (this) {
+                assert topVer.topologyVersion() > 0 : topVer;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -819,7 +803,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (err != null) {
-            onDone(err);
+            completeFuture(null, err, futId);
 
             return;
         }
@@ -831,7 +815,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             assert mappings0 != null;
 
             if (size == 0) {
-                onDone(new GridCacheReturn(cctx, true, true, null, true));
+                completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, futId);
 
                 return;
             }
@@ -840,7 +824,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (syncMode == FULL_ASYNC) {
-            onDone(new GridCacheReturn(cctx, true, true, null, true));
+            completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, futId);
 
             return;
         }
@@ -921,22 +905,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 sendCheckUpdateRequest(checkReqs.get(i));
         }
         else if (rcvAll)
-            finishUpdateFuture(opRes0, err0, remapTopVer0);
-    }
-
-    /**
-     * @return Future version.
-     */
-    private Long onFutureDone() {
-        Long id0;
-
-        synchronized (this) {
-            id0 = futId;
-
-            futId = 0;
-        }
-
-        return id0;
+            finishUpdateFuture(opRes0, err0, remapTopVer0, futId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/14582779/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index dfb3f65..cc9247b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -395,9 +395,9 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
             nodeIdx++;
         }
-        while (nodeIdx < 10);
+        while (nodeIdx < 5);
 
-        assertEquals(keys.size(), keysMoved);
+        assertTrue("Failed to get moved key: " + keysMoved, keysMoved > 1);
 
         testSpi(clientNode).stopBlock(true);
 


Mime
View raw message