Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F034E185FD for ; Fri, 18 Sep 2015 02:29:10 +0000 (UTC) Received: (qmail 67294 invoked by uid 500); 18 Sep 2015 02:29:10 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 67222 invoked by uid 500); 18 Sep 2015 02:29:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 66325 invoked by uid 99); 18 Sep 2015 02:29:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 02:29:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DC21E10C9; Fri, 18 Sep 2015 02:29:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Fri, 18 Sep 2015 02:29:36 -0000 Message-Id: <127d972ff1344af9b0359382ac8ecb42@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [28/50] [abbrv] ignite git commit: IGNITE-1400 On node stop prevent exchange worker hang on topology lock IGNITE-1400 On node stop prevent exchange worker hang on topology lock Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a81cce72 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a81cce72 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a81cce72 Branch: refs/heads/ignite-843 Commit: a81cce7214c966de0281ef82da0b1fe042842911 Parents: 367d805 Author: sboikov Authored: Tue Sep 15 15:46:16 2015 +0300 Committer: sboikov Committed: Tue Sep 15 15:49:42 2015 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 104 +++++++++++-------- .../dht/GridDhtPartitionTopology.java | 4 +- .../dht/GridDhtPartitionTopologyImpl.java | 7 +- .../ignite/internal/util/IgniteUtils.java | 16 +++ 4 files changed, 83 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 21a7b3b..5e3cc0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -110,7 +111,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log = cctx.logger(getClass()); - beforeExchange(exchFut); + lock.writeLock().lock(); + + try { + beforeExchange0(cctx.localNode(), exchFut); + } + finally { + lock.writeLock().unlock(); + } } /** @@ -154,8 +162,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { GridDhtPartitionsExchangeFuture exchFut, long updSeq, boolean stopping - ) { - lock.writeLock().lock(); + ) throws IgniteInterruptedCheckedException { + U.writeLock(lock); try { assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + @@ -208,67 +216,75 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) { + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { ClusterNode loc = cctx.localNode(); - lock.writeLock().lock(); + U.writeLock(lock); try { if (stopping) return; - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + beforeExchange0(loc, exchFut); + } + finally { + lock.writeLock().unlock(); + } + } - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + /** + * @param loc Local node. + * @param exchFut Exchange future. + */ + private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - if (!exchId.isJoined()) - removeNode(exchId.nodeId()); + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + if (!exchId.isJoined()) + removeNode(exchId.nodeId()); - assert oldest != null; + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + assert oldest != null; - long updateSeq = this.updateSeq.incrementAndGet(); - - // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + long updateSeq = this.updateSeq.incrementAndGet(); - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); - } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + // If this is the oldest node. + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - consistencyCheck(); + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d4ea3d6..d642314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -50,13 +51,14 @@ public interface GridDhtPartitionTopology { * * @param exchId Exchange ID. * @param exchFut Exchange future. + * @throws IgniteInterruptedCheckedException If interrupted. */ public void updateTopologyVersion( GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut, long updateSeq, boolean stopping - ); + ) throws IgniteInterruptedCheckedException; /** * Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index fcb012f..a0c9c88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -205,8 +206,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionsExchangeFuture exchFut, long updSeq, boolean stopping - ) { - lock.writeLock().lock(); + ) throws IgniteInterruptedCheckedException { + U.writeLock(lock); try { assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + @@ -267,7 +268,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int num = cctx.affinity().partitions(); - lock.writeLock().lock(); + U.writeLock(lock); try { GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index ba918f6..e5090cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -119,6 +119,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; import java.util.jar.JarFile; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; @@ -9286,4 +9287,19 @@ public abstract class IgniteUtils { return hasShmem; } + + /** + * @param lock Lock. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + public static void writeLock(ReadWriteLock lock) throws IgniteInterruptedCheckedException { + try { + lock.writeLock().lockInterruptibly(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } } \ No newline at end of file