Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 234FF200CD7 for ; Tue, 1 Aug 2017 15:23:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 219CE1671A7; Tue, 1 Aug 2017 13:23:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 40E441671A6 for ; Tue, 1 Aug 2017 15:23:33 +0200 (CEST) Received: (qmail 93655 invoked by uid 500); 1 Aug 2017 13:23:32 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 93646 invoked by uid 99); 1 Aug 2017 13:23:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Aug 2017 13:23:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7336E02F3; Tue, 1 Aug 2017 13:23:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-5578 Date: Tue, 1 Aug 2017 13:23:31 +0000 (UTC) archived-at: Tue, 01 Aug 2017 13:23:34 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5578 2ae3abd7d -> b593db7f2 ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b593db7f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b593db7f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b593db7f Branch: refs/heads/ignite-5578 Commit: b593db7f2f6ee22424e06570b1b8cb6dc94ef4b0 Parents: 2ae3abd Author: sboikov Authored: Tue Aug 1 16:23:16 2017 +0300 Committer: sboikov Committed: Tue Aug 1 16:23:16 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 84 ++++---------------- .../GridDhtPartitionsExchangeFuture.java | 4 +- 2 files changed, 16 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b593db7f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ae214a0..7c3d1c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -38,7 +38,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -108,7 +107,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; -import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -1686,10 +1684,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { return; } - for (;;) { - if (updateAllAsyncInternal0(node, req, completionCb)) - break; - } + updateAllAsyncInternal0(node, req, completionCb); } else { forceFut.listen(new CI1>() { @@ -1706,10 +1701,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { return; } - for (;;) { - if (updateAllAsyncInternal0(node, req, completionCb)) - break; - } + updateAllAsyncInternal0(node, req, completionCb); } }); } @@ -1744,16 +1736,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param node Node. * @param req Update request. * @param completionCb Completion callback. - * @return {@code True} if update was executed, {@code false} if need retry update. */ - private boolean updateAllAsyncInternal0( + private void updateAllAsyncInternal0( ClusterNode node, GridNearAtomicAbstractUpdateRequest req, UpdateReplyClosure completionCb ) { - if (waitForTopologyFuture(node, req, completionCb)) - return true; - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), node.id(), req.futureId(), @@ -1789,17 +1777,21 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { completionCb.apply(req, res); - return true; + return; } - GridDhtTopologyFuture topFut = top.topologyVersionFuture(); - - if (!req.topologyLocked() && !topFut.isDone()) - return false; // Will wait at the beginning of next updateAllAsyncInternal0 call. + boolean remap = false; // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.readyTopologyVersion())) { + if (!req.topologyLocked()) { + // Can not wait for topology future since it will break + // GridNearAtomicCheckUpdateRequest processing. + remap = !top.topologyVersionFuture().isDone() || + needRemap(req.topologyVersion(), top.readyTopologyVersion()); + } + + if (!remap) { DhtAtomicUpdateResult updRes = update(node, locked, req, res); dhtFut = updRes.dhtFuture(); @@ -1856,7 +1848,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (e instanceof Error) throw (Error)e; - return true; + return; } finally { ctx.shared().database().checkpointReadUnlock(); @@ -1876,54 +1868,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.cleanup(!node.isLocal()); sendTtlUpdateRequest(expiry); - - return true; - } - - /** - * @param node Sender node. - * @param req Request. - * @param completionCb Completion callback. - * @return {@code True} if update will be retried from future listener. - */ - private boolean waitForTopologyFuture(final ClusterNode node, - final GridNearAtomicAbstractUpdateRequest req, - final UpdateReplyClosure completionCb) { - if (req.topologyLocked()) - return false; - - GridDhtTopologyFuture topFut = ctx.group().topology().topologyVersionFuture(); - - if (!topFut.isDone()) { - Thread curThread = Thread.currentThread(); - - if (curThread instanceof IgniteThread) { - final IgniteThread thread = (IgniteThread)curThread; - - if (thread.cachePoolThread()) { - topFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - ctx.closures().runLocalWithThreadPolicy(thread, new Runnable() { - @Override public void run() { - updateAllAsyncInternal(node, req, completionCb); - } - }); - } - }); - - return true; - } - } - - try { - topFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Topology future failed: " + e, e); - } - } - - return false; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b593db7f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d04974a..de02238 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1409,8 +1409,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.cache().onExchangeDone(initialVersion(), exchActions, err); - cctx.exchange().onExchangeDone(res, initialVersion(), err); - if (exchActions != null && err == null) exchActions.completeRequestFutures(cctx); @@ -1445,6 +1443,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + cctx.exchange().onExchangeDone(res, initialVersion(), err); + initFut.onDone(err == null); if (exchId.isLeft()) {