ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [16/16] ignite git commit: # ignite-1124
Date Wed, 26 Aug 2015 10:34:49 GMT
# ignite-1124


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a965d439
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a965d439
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a965d439

Branch: refs/heads/ignite-1124
Commit: a965d43948becaf75206714b59e7542de276bada
Parents: 5665a7d
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Aug 26 13:32:11 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 26 13:32:11 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 366 ++++++++++---------
 .../IgniteCachePutRetryAbstractSelfTest.java    |   5 +-
 2 files changed, 199 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a965d439/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d0c8766..df485dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -82,9 +82,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
-    /** Operation result. */
-    private volatile GridCacheReturn opRes;
-
     /** Return value require flag. */
     private final boolean retval;
 
@@ -376,6 +373,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
             else {
                 if (waitTopFut) {
+                    assert !topLocked : this;
+
                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@@ -507,16 +506,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      */
     private class UpdateState {
-        /** */
+        /** Current topology version. */
         private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
         /** */
         private GridCacheVersion updVer;
 
-        /** */
+        /** Topology version when got mapping error. */
         private AffinityTopologyVersion mapErrTopVer;
 
-        /** Mappings. */
+        /** Mappings if operations is mapped to more than one node. */
         @GridToStringInclude
         private Map<UUID, GridNearAtomicUpdateRequest> mappings;
 
@@ -529,32 +528,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         /** Completion future for a particular topology version. */
         private GridFutureAdapter<Void> topCompleteFut;
 
-        /** */
+        /** Keys to remap. */
         private Collection<KeyCacheObject> remapKeys;
 
-        /** */
+        /** Not null is operation is mapped to single node. */
         private GridNearAtomicUpdateRequest singleReq;
 
-        /**
-         * @param failedKeys Failed keys.
-         * @param topVer Topology version for failed update.
-         * @param err Error cause.
-         */
-        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
-            AffinityTopologyVersion topVer,
-            Throwable err) {
-            CachePartialUpdateCheckedException err0 = this.err;
-
-            if (err0 == null)
-                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update
keys (retry update if possible).");
-
-            Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
-            for (KeyCacheObject key : failedKeys)
-                keys.add(key.value(cctx.cacheObjectContext(), false));
-
-            err0.add(keys, err, topVer);
-        }
+        /** Operation result. */
+        private GridCacheReturn opRes;
 
         /**
          * @return Future version.
@@ -580,8 +561,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (req != null) {
                     res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
 
-                    res.addFailedKeys(req.keys(),
-                        new ClusterTopologyCheckedException("Primary node left grid before
response is received: " + nodeId));
+                    res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary
node left grid before " +
+                        "response is received: " + nodeId));
                 }
             }
 
@@ -590,22 +571,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         /**
-         * @param ret Result from single node.
-         */
-        @SuppressWarnings("unchecked")
-        private void addInvokeResults(GridCacheReturn ret) {
-            assert op == TRANSFORM : op;
-            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-            if (ret.value() != null) {
-                if (opRes != null)
-                    opRes.mergeEntryProcessResults(ret);
-                else
-                    opRes = ret;
-            }
-        }
-
-        /**
          * @param nodeId Node ID.
          * @param res Response.
          * @param nodeErr {@code True} if response was created on node failure.
@@ -735,16 +700,40 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (fut0 != null)
                     fut0.onDone();
 
-                IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+                if (!waitTopFut) {
+                    onDone(new GridCacheTryPutFailedException());
+
+                    return;
+                }
+
+                if (topLocked) {
+                    assert !F.isEmpty(remapKeys) : remapKeys;
+
+                    CachePartialUpdateCheckedException e =
+                        new CachePartialUpdateCheckedException("Failed to update keys (retry
update if possible).");
+
+                    ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                        "Failed to update keys, topology changed while execute atomic update
inside transaction.");
+
+                    cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+                    e.add(remapKeys, cause);
+
+                    onDone(e);
 
-                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(final IgniteInternalFuture<?> fut)
{
+                    return;
+                }
+
+                IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                    @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                         cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                             @Override public void run() {
                                 try {
-                                    fut.get();
+                                    AffinityTopologyVersion topVer = fut.get();
 
-                                    mapOnTopology();
+                                    map(topVer);
                                 }
                                 catch (IgniteCheckedException e) {
                                     onDone(e);
@@ -757,8 +746,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 return;
             }
 
-            assert fut0 == null;
-
             if (rcvAll)
                 onDone(opRes0, err0);
         }
@@ -780,11 +767,142 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         /**
+         * @param topVer Topology version.
+         */
+        void map(AffinityTopologyVersion topVer) {
+            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+            if (F.isEmpty(topNodes)) {
+                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for
cache (all partition nodes " +
+                    "left the grid)."));
+
+                return;
+            }
+
+            Exception err = null;
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+
+            int size = keys.size();
+
+            synchronized (this) {
+                assert futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+
+                futVer = cctx.versions().next(topVer);
+
+                if (storeFuture())
+                    cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+
+                // Assign version on near node in CLOCK ordering mode even if fastMap is
false.
+                if (updVer == null)
+                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer)
: null;
+
+                if (updVer != null && log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+
+                try {
+                    if (size == 1 && !fastMap) {
+                        assert remapKeys == null || remapKeys.size() == 1;
+
+                        singleReq = mapSingleUpdate();
+                    }
+                    else {
+                        pendingMappings = mapUpdate(topNodes);
+
+                        if (pendingMappings.size() == 1)
+                            singleReq = F.firstValue(pendingMappings);
+                        else {
+                            if (syncMode == PRIMARY_SYNC) {
+                                mappings = U.newHashMap(pendingMappings.size());
+
+                                for (GridNearAtomicUpdateRequest req : pendingMappings.values())
{
+                                    if (req.hasPrimary())
+                                        mappings.put(req.nodeId(), req);
+                                }
+                            }
+                            else
+                                mappings = new HashMap<>(pendingMappings);
+
+                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+                        }
+                    }
+
+                    remapKeys = null;
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+
+            if (err != null) {
+                onDone(err);
+
+                return;
+            }
+
+            // Optimize mapping for single key.
+            if (singleReq != null)
+                mapSingle(singleReq.nodeId(), singleReq);
+            else {
+                assert pendingMappings != null;
+
+                if (size == 0)
+                    onDone(new GridCacheReturn(cctx, true, null, true));
+                else
+                    doUpdate(pendingMappings);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @return Future.
+         */
+        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion
topVer) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
+
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
+
+                return topCompleteFut;
+            }
+
+            return null;
+        }
+
+        /**
+         * @return Future version.
+         */
+        GridCacheVersion onFutureDone() {
+            GridCacheVersion ver0;
+
+            GridFutureAdapter<Void> fut0;
+
+            synchronized (this) {
+                fut0 = topCompleteFut;
+
+                topCompleteFut = null;
+
+                ver0 = futVer;
+
+                futVer = null;
+            }
+
+            if (fut0 != null)
+                fut0.onDone();
+
+            return ver0;
+        }
+
+        /**
          * @param topNodes Cache nodes.
          * @return Mapping.
          * @throws Exception If failed.
          */
-        Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode>
topNodes) throws Exception {
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode>
topNodes) throws Exception {
             Iterator<?> it = null;
 
             if (vals != null)
@@ -906,7 +1024,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
          * @return Request.
          * @throws Exception If failed.
          */
-        GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
             Object key = F.first(keys);
 
             Object val;
@@ -993,134 +1111,40 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         /**
-         * @param topVer Topology version.
+         * @param ret Result from single node.
          */
-        void map(AffinityTopologyVersion topVer) {
-            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
-            if (F.isEmpty(topNodes)) {
-                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for
cache (all partition nodes " +
-                    "left the grid)."));
-
-                return;
-            }
-
-            Exception err = null;
-            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
-
-            int size = keys.size();
-
-            synchronized (this) {
-                assert futVer == null : this;
-                assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
-                this.topVer = topVer;
-
-                futVer = cctx.versions().next(topVer);
-
-                if (storeFuture())
-                    cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
-
-                // Assign version on near node in CLOCK ordering mode even if fastMap is
false.
-                if (updVer == null)
-                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer)
: null;
-
-                if (updVer != null && log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-
-                try {
-                    if (size == 1 && !fastMap) {
-                        assert remapKeys == null || remapKeys.size() == 1;
-
-                        singleReq = mapSingleUpdate();
-                    }
-                    else {
-                        pendingMappings = mapUpdate(topNodes);
-
-                        if (pendingMappings.size() == 1)
-                            singleReq = F.firstValue(pendingMappings);
-                        else {
-                            if (syncMode == PRIMARY_SYNC) {
-                                mappings = U.newHashMap(pendingMappings.size());
-
-                                for (GridNearAtomicUpdateRequest req : pendingMappings.values())
{
-                                    if (req.hasPrimary())
-                                        mappings.put(req.nodeId(), req);
-                                }
-                            }
-                            else
-                                mappings = new HashMap<>(pendingMappings);
-
-                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
-                        }
-                    }
-
-                    remapKeys = null;
-                }
-                catch (Exception e) {
-                    err = e;
-                }
-            }
-
-            if (err != null) {
-                onDone(err);
-
-                return;
-            }
-
-            // Optimize mapping for single key.
-            if (singleReq != null)
-                mapSingle(singleReq.nodeId(), singleReq);
-            else {
-                assert pendingMappings != null;
+        @SuppressWarnings("unchecked")
+        private void addInvokeResults(GridCacheReturn ret) {
+            assert op == TRANSFORM : op;
+            assert ret.value() == null || ret.value() instanceof Map : ret.value();
 
-                if (size == 0)
-                    onDone(new GridCacheReturn(cctx, true, null, true));
+            if (ret.value() != null) {
+                if (opRes != null)
+                    opRes.mergeEntryProcessResults(ret);
                 else
-                    doUpdate(pendingMappings);
-            }
-        }
-
-        /**
-         * @param topVer Topology version.
-         * @return Future.
-         */
-        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion
topVer) {
-            if (this.topVer == AffinityTopologyVersion.ZERO)
-                return null;
-
-            if (this.topVer.compareTo(topVer) < 0) {
-                if (topCompleteFut == null)
-                    topCompleteFut = new GridFutureAdapter<>();
-
-                return topCompleteFut;
+                    opRes = ret;
             }
-
-            return null;
         }
 
         /**
-         * @return Future version.
+         * @param failedKeys Failed keys.
+         * @param topVer Topology version for failed update.
+         * @param err Error cause.
          */
-        GridCacheVersion onFutureDone() {
-            GridCacheVersion ver0;
-
-            GridFutureAdapter<Void> fut0;
-
-            synchronized (this) {
-                fut0 = topCompleteFut;
-
-                topCompleteFut = null;
+        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+            AffinityTopologyVersion topVer,
+            Throwable err) {
+            CachePartialUpdateCheckedException err0 = this.err;
 
-                ver0 = futVer;
+            if (err0 == null)
+                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update
keys (retry update if possible).");
 
-                futVer = null;
-            }
+            Collection<Object> keys = new ArrayList<>(failedKeys.size());
 
-            if (fut0 != null)
-                fut0.onDone();
+            for (KeyCacheObject key : failedKeys)
+                keys.add(key.value(cctx.cacheObjectContext(), false));
 
-            return ver0;
+            err0.add(keys, err, topVer);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a965d439/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 041a0f9..276f89c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -175,8 +175,11 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
                         for (int i = 0; i < keysCnt; i++) {
                             map.put(i, val);
 
-                            if (map.size() == 100 || i == keysCnt - 1)
+                            if (map.size() == 100 || i == keysCnt - 1) {
                                 cache.putAll(map);
+
+                                map.clear();
+                            }
                         }
 
                         for (int i = 0; i < keysCnt; i++)


Mime
View raw message