ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Tue, 01 Aug 2017 13:23:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 2ae3abd7d -> b593db7f2


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b593db7f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b593db7f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b593db7f

Branch: refs/heads/ignite-5578
Commit: b593db7f2f6ee22424e06570b1b8cb6dc94ef4b0
Parents: 2ae3abd
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Aug 1 16:23:16 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Aug 1 16:23:16 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 84 ++++----------------
 .../GridDhtPartitionsExchangeFuture.java        |  4 +-
 2 files changed, 16 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b593db7f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ae214a0..7c3d1c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,7 +38,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -108,7 +107,6 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
-import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -1686,10 +1684,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 return;
             }
 
-            for (;;) {
-                if (updateAllAsyncInternal0(node, req, completionCb))
-                    break;
-            }
+            updateAllAsyncInternal0(node, req, completionCb);
         }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1706,10 +1701,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         return;
                     }
 
-                    for (;;) {
-                        if (updateAllAsyncInternal0(node, req, completionCb))
-                            break;
-                    }
+                    updateAllAsyncInternal0(node, req, completionCb);
                 }
             });
         }
@@ -1744,16 +1736,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param node Node.
      * @param req Update request.
      * @param completionCb Completion callback.
-     * @return {@code True} if update was executed, {@code false} if need retry update.
      */
-    private boolean updateAllAsyncInternal0(
+    private void updateAllAsyncInternal0(
         ClusterNode node,
         GridNearAtomicAbstractUpdateRequest req,
         UpdateReplyClosure completionCb
     ) {
-        if (waitForTopologyFuture(node, req, completionCb))
-            return true;
-
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             node.id(),
             req.futureId(),
@@ -1789,17 +1777,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         completionCb.apply(req, res);
 
-                        return true;
+                        return;
                     }
 
-                    GridDhtTopologyFuture topFut = top.topologyVersionFuture();
-
-                    if (!req.topologyLocked() && !topFut.isDone())
-                        return false; // Will wait at the beginning of next updateAllAsyncInternal0
call.
+                    boolean remap = false;
 
                     // Do not check topology version if topology was locked on near node
by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.readyTopologyVersion()))
{
+                    if (!req.topologyLocked()) {
+                        // Can not wait for topology future since it will break
+                        // GridNearAtomicCheckUpdateRequest processing.
+                        remap = !top.topologyVersionFuture().isDone() ||
+                            needRemap(req.topologyVersion(), top.readyTopologyVersion());
+                    }
+
+                    if (!remap) {
                         DhtAtomicUpdateResult updRes = update(node, locked, req, res);
 
                         dhtFut = updRes.dhtFuture();
@@ -1856,7 +1848,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             if (e instanceof Error)
                 throw (Error)e;
 
-            return true;
+            return;
         }
         finally {
             ctx.shared().database().checkpointReadUnlock();
@@ -1876,54 +1868,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             req.cleanup(!node.isLocal());
 
         sendTtlUpdateRequest(expiry);
-
-        return true;
-    }
-
-    /**
-     * @param node Sender node.
-     * @param req Request.
-     * @param completionCb Completion callback.
-     * @return {@code True} if update will be retried from future listener.
-     */
-    private boolean waitForTopologyFuture(final ClusterNode node,
-        final GridNearAtomicAbstractUpdateRequest req,
-        final UpdateReplyClosure completionCb) {
-        if (req.topologyLocked())
-            return false;
-
-        GridDhtTopologyFuture topFut = ctx.group().topology().topologyVersionFuture();
-
-        if (!topFut.isDone()) {
-            Thread curThread = Thread.currentThread();
-
-            if (curThread instanceof IgniteThread) {
-                final IgniteThread thread = (IgniteThread)curThread;
-
-                if (thread.cachePoolThread()) {
-                    topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
-                            ctx.closures().runLocalWithThreadPolicy(thread, new Runnable()
{
-                                @Override public void run() {
-                                    updateAllAsyncInternal(node, req, completionCb);
-                                }
-                            });
-                        }
-                    });
-
-                    return true;
-                }
-            }
-
-            try {
-                topFut.get();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Topology future failed: " + e, e);
-            }
-        }
-
-        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b593db7f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d04974a..de02238 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1409,8 +1409,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
-        cctx.exchange().onExchangeDone(res, initialVersion(), err);
-
         if (exchActions != null && err == null)
             exchActions.completeRequestFutures(cctx);
 
@@ -1445,6 +1443,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId()
+ ", exchange= " + this +
                     ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
 
+            cctx.exchange().onExchangeDone(res, initialVersion(), err);
+
             initFut.onDone(err == null);
 
             if (exchId.isLeft()) {


Mime
View raw message