Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8806818825 for ; Fri, 20 Nov 2015 16:30:06 +0000 (UTC) Received: (qmail 7980 invoked by uid 500); 20 Nov 2015 16:30:06 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 7897 invoked by uid 500); 20 Nov 2015 16:30:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 7881 invoked by uid 99); 20 Nov 2015 16:30:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 16:30:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2E013E0440; Fri, 20 Nov 2015 16:30:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dmagda@apache.org To: commits@ignite.apache.org Date: Fri, 20 Nov 2015 16:30:07 -0000 Message-Id: In-Reply-To: <7b5bbbe36f464aaf9187429f815b817f@git.apache.org> References: <7b5bbbe36f464aaf9187429f815b817f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] ignite git commit: ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c711484c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c711484c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c711484c Branch: refs/heads/ignite-1.5 Commit: c711484c30315c06ce0b31a8775bfc41b7ee1483 Parents: 8e7e330 Author: Denis Magda Authored: Fri Nov 20 19:11:07 2015 +0300 Committer: Denis Magda Committed: Fri Nov 20 19:11:07 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheUtils.java | 39 +- .../CacheDataStructuresManager.java | 2 +- .../GridFutureRemapTimeoutObject.java | 72 -- .../dht/GridPartitionedGetFuture.java | 28 +- .../distributed/near/GridNearGetFuture.java | 28 +- .../IgniteTxImplicitSingleStateImpl.java | 29 +- .../IgniteTxRemoteSingleStateImpl.java | 19 +- .../datastructures/DataStructuresProcessor.java | 47 +- .../GridAtomicCacheQueueImpl.java | 126 +-- .../GridCacheAtomicReferenceImpl.java | 10 +- .../GridCacheCountDownLatchImpl.java | 15 +- .../datastructures/GridCacheQueueAdapter.java | 32 +- .../GridTransactionalCacheQueueImpl.java | 193 ++-- .../ignite/spi/discovery/DiscoverySpi.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 36 + ...eAbstractDataStructuresFailoverSelfTest.java | 924 +++++++++---------- ...rtitionedDataStructuresFailoverSelfTest.java | 7 +- ...edOffheapDataStructuresFailoverSelfTest.java | 12 +- ...eplicatedDataStructuresFailoverSelfTest.java | 5 - ...gniteAtomicLongChangingTopologySelfTest.java | 2 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 4 +- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 90 +- .../testframework/junits/GridAbstractTest.java | 6 +- 24 files changed, 796 insertions(+), 936 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index f7d115f..89779d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -1780,28 +1782,41 @@ public class GridCacheUtils { public static Callable retryTopologySafe(final Callable c ) { return new Callable() { @Override public S call() throws Exception { - int retries = GridCacheAdapter.MAX_RETRIES; - IgniteCheckedException err = null; - for (int i = 0; i < retries; i++) { + for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { try { return c.call(); } + catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) { + throw e; + } + catch (TransactionRollbackException e) { + if (i + 1 == GridCacheAdapter.MAX_RETRIES) + throw e; + + U.sleep(1); + } catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyCheckedException.class) || - X.hasCause(e, IgniteTxRollbackCheckedException.class) || - X.hasCause(e, CachePartialUpdateCheckedException.class)) { - if (i < retries - 1) { - err = e; + if (i + 1 == GridCacheAdapter.MAX_RETRIES) + throw e; - U.sleep(1); + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - continue; - } + if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof + ClusterTopologyServerNotFoundException) + throw e; - throw e; + // IGNITE-1948: remove this check when the issue is fixed + if (topErr.retryReadyFuture() != null) + topErr.retryReadyFuture().get(); + else + U.sleep(1); } + else if (X.hasCause(e, IgniteTxRollbackCheckedException.class, + CachePartialUpdateCheckedException.class)) + U.sleep(1); else throw e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 1ff4575..930921b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -770,4 +770,4 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { return "RemoveSetCallable [setId=" + setId + ']'; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java deleted file mode 100644 index 72fdd4b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.future.GridFutureAdapter; - -/** - * Future remap timeout object. - */ -public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter { - /** */ - private final GridFutureAdapter fut; - - /** Finished flag. */ - private final AtomicBoolean finished = new AtomicBoolean(); - - /** Topology version to wait. */ - private final AffinityTopologyVersion topVer; - - /** Exception cause. */ - private final IgniteCheckedException e; - - /** - * @param fut Future. - * @param timeout Timeout. - * @param topVer Topology version timeout was created on. - * @param e Exception cause. - */ - public GridFutureRemapTimeoutObject( - GridFutureAdapter fut, - long timeout, - AffinityTopologyVersion topVer, - IgniteCheckedException e) { - super(timeout); - - this.fut = fut; - this.topVer = topVer; - this.e = e; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (finish()) // Fail the whole get future, else remap happened concurrently. - fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e)); - } - - /** - * @return Guard against concurrent completion. - */ - public boolean finish() { - return finished.compareAndSet(false, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index c3d9836..e3fae22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -644,34 +643,23 @@ public class GridPartitionedGetFuture extends CacheDistributedGetFutureAda final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( new CI1>() { @Override public void apply(IgniteInternalFuture fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); - - try { - fut.get(); + try { + fut.get(); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.emptyMap()); - } - catch (IgniteCheckedException e) { - GridPartitionedGetFuture.this.onDone(e); - } + onDone(Collections.emptyMap()); + } + catch (IgniteCheckedException e) { + GridPartitionedGetFuture.this.onDone(e); } } } ); - - cctx.kernalContext().timeout().addTimeoutObject(timeout); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index dfaa44e..f1bff61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject; import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; @@ -851,34 +850,23 @@ public final class GridNearGetFuture extends CacheDistributedGetFutureAdap final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( new CI1>() { @Override public void apply(IgniteInternalFuture fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); - - try { - fut.get(); + try { + fut.get(); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.emptyMap()); - } - catch (IgniteCheckedException e) { - GridNearGetFuture.this.onDone(e); - } + onDone(Collections.emptyMap()); + } + catch (IgniteCheckedException e) { + GridNearGetFuture.this.onDone(e); } } } ); - - cctx.kernalContext().timeout().addTimeoutObject(timeout); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index c75a8f38..3e0231e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteCheckedException; @@ -28,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -160,8 +163,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { CacheStoreManager store = cacheCtx.store(); - if (store.configured()) - return Collections.singleton(store); + if (store.configured()) { + HashSet set = new HashSet<>(3, 0.75f); + + set.add(store); + + return set; + } return null; } @@ -192,12 +200,20 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public Set writeSet() { - return entry != null ? Collections.singleton(entry.txKey()) : Collections.emptySet(); + if (entry != null) { + HashSet set = new HashSet<>(3, 0.75f); + + set.add(entry.txKey()); + + return set; + } + else + return Collections.emptySet(); } /** {@inheritDoc} */ @Override public Collection writeEntries() { - return entry != null ? Collections.singletonList(entry) : Collections.emptyList(); + return entry != null ? Arrays.asList(entry) : Collections.emptyList(); } /** {@inheritDoc} */ @@ -207,8 +223,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public Map writeMap() { - return entry != null ? Collections.singletonMap(entry.txKey(), entry) : - Collections.emptyMap(); + return entry != null ? F.asMap(entry.txKey(), entry) : Collections.emptyMap(); } /** {@inheritDoc} */ @@ -223,7 +238,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public Collection allEntries() { - return entry != null ? Collections.singletonList(entry) : Collections.emptyList(); + return entry != null ? Arrays.asList(entry) : Collections.emptyList(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java index 22f04a8..90af517 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java @@ -17,10 +17,13 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -62,12 +65,20 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter { /** {@inheritDoc} */ @Override public Set writeSet() { - return entry != null ? Collections.singleton(entry.txKey()) : Collections.emptySet(); + if (entry != null) { + HashSet set = new HashSet<>(3, 0.75f); + + set.add(entry.txKey()); + + return set; + } + else + return Collections.emptySet(); } /** {@inheritDoc} */ @Override public Collection writeEntries() { - return entry != null ? Collections.singletonList(entry) : Collections.emptyList(); + return entry != null ? Arrays.asList(entry) : Collections.emptyList(); } /** {@inheritDoc} */ @@ -77,7 +88,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter { /** {@inheritDoc} */ @Override public Map writeMap() { - return entry != null ? Collections.singletonMap(entry.txKey(), entry) : + return entry != null ? F.asMap(entry.txKey(), entry) : Collections.emptyMap(); } @@ -93,7 +104,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter { /** {@inheritDoc} */ @Override public Collection allEntries() { - return entry != null ? Collections.singletonList(entry) : Collections.emptyList(); + return entry != null ? Arrays.asList(entry) : Collections.emptyList(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index b532d7f..23d64cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -56,14 +56,13 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheType; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheInternal; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -532,21 +531,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dataStructure != null) return dataStructure; - if (!create) - return c.applyx(); - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + try { + if (!create) + return c.applyx(); - if (err != null) - throw err; + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); - dataStructure = c.applyx(); + if (err != null) + throw err; - tx.commit(); + dataStructure = c.applyx(); + + tx.commit(); - return dataStructure; + return dataStructure; + } } catch (IgniteTxRollbackCheckedException ignore) { // Safe to retry right away. @@ -1605,27 +1606,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { */ public static R retry(IgniteLogger log, Callable call) throws IgniteCheckedException { try { - int cnt = 0; - - while (true) { - try { - return call.call(); - } - catch (ClusterGroupEmptyCheckedException e) { - throw new IgniteCheckedException(e); - } - catch (IgniteTxRollbackCheckedException | - CachePartialUpdateCheckedException | - ClusterTopologyCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + return GridCacheUtils.retryTopologySafe(call).call(); } catch (IgniteCheckedException e) { throw e; http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java index 28f8631..b433887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java @@ -23,7 +23,6 @@ import java.util.Map; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; @@ -56,26 +55,9 @@ public class GridAtomicCacheQueueImpl extends GridCacheQueueAdapter { checkRemoved(idx); - int cnt = 0; - GridCacheQueueItemKey key = itemKey(idx); - while (true) { - try { - cache.getAndPut(key, item); - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + cache.getAndPut(key, item); return true; } @@ -98,38 +80,18 @@ public class GridAtomicCacheQueueImpl extends GridCacheQueueAdapter { GridCacheQueueItemKey key = itemKey(idx); - int cnt = 0; - - long stop = 0; + T data = (T)cache.getAndRemove(key); - while (true) { - try { - T data = (T)cache.getAndRemove(key); + if (data != null) + return data; - if (data != null) - return data; + long stop = U.currentTimeMillis() + RETRY_TIMEOUT; - if (stop == 0) - stop = U.currentTimeMillis() + RETRY_TIMEOUT; + while (U.currentTimeMillis() < stop) { + data = (T)cache.getAndRemove(key); - while (U.currentTimeMillis() < stop ) { - data = (T)cache.getAndRemove(key); - - if (data != null) - return data; - } - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } + if (data != null) + return data; } U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']'); @@ -161,24 +123,7 @@ public class GridAtomicCacheQueueImpl extends GridCacheQueueAdapter { idx++; } - int cnt = 0; - - while (true) { - try { - cache.putAll(putMap); - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add items, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + cache.putAll(putMap); return true; } @@ -197,34 +142,14 @@ public class GridAtomicCacheQueueImpl extends GridCacheQueueAdapter { GridCacheQueueItemKey key = itemKey(idx); - int cnt = 0; + if (cache.remove(key)) + return; - long stop = 0; - - while (true) { - try { - if (cache.remove(key)) - return; + long stop = U.currentTimeMillis() + RETRY_TIMEOUT; - if (stop == 0) - stop = U.currentTimeMillis() + RETRY_TIMEOUT; - - while (U.currentTimeMillis() < stop ) { - if (cache.remove(key)) - return; - } - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add items, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } + while (U.currentTimeMillis() < stop) { + if (cache.remove(key)) + return; } U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']'); @@ -239,21 +164,6 @@ public class GridAtomicCacheQueueImpl extends GridCacheQueueAdapter { @SuppressWarnings("unchecked") @Nullable private Long transformHeader(EntryProcessor c) throws IgniteCheckedException { - int cnt = 0; - - while (true) { - try { - return (Long)cache.invoke(queueKey, c).get(); - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to update queue header, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + return (Long)cache.invoke(queueKey, c).get(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index c0c38b2..37cdaea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; /** * Cache atomic reference implementation. @@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef * @return Callable for execution in async and sync mode. */ private Callable internalSet(final T val) { - return new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue ref = atomicView.get(key); @@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef throw e; } } - }; + }); } /** @@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef */ private Callable internalCompareAndSet(final IgnitePredicate expValPred, final IgniteClosure newValClos) { - return new Callable() { + + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue ref = atomicView.get(key); @@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef throw e; } } - }; + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 2667938..c984ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -342,20 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc private class GetCountCallable implements Callable { /** {@inheritDoc} */ @Override public Integer call() throws Exception { - Integer val; + GridCacheCountDownLatchValue latchVal = latchView.get(key); - try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue latchVal = latchView.get(key); - - if (latchVal == null) - return 0; - - val = latchVal.get(); - - tx.rollback(); - } - - return val; + return latchVal == null ? 0 : latchVal.get(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 0e4aebc..df1bd88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE; /** */ - protected static final int MAX_UPDATE_RETRIES = 100; - - /** */ protected static final long RETRY_DELAY = 1; /** */ @@ -169,14 +166,22 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @SuppressWarnings("unchecked") @Nullable @Override public T peek() throws IgniteException { try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + while (true) { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - if (hdr.empty()) - return null; + if (hdr.empty()) + return null; - return (T)cache.get(itemKey(hdr.head())); + T val = (T)cache.get(itemKey(hdr.head())); + + if (val == null) + // Header might have been polled. Retry. + continue; + + return val; + } } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -416,8 +421,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp long startIdx, long endIdx, int batchSize) - throws IgniteCheckedException - { + throws IgniteCheckedException { Set keys = new HashSet<>(batchSize > 0 ? batchSize : 10); for (long idx = startIdx; idx < endIdx; idx++) { @@ -435,8 +439,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp } /** - * Checks result of closure modifying queue header, throws {@link IllegalStateException} - * if queue was removed. + * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed. * * @param idx Result of closure execution. */ @@ -529,7 +532,6 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp */ protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException; - /** * @param idx Item index. * @return Item key. @@ -1036,7 +1038,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp if (o == null || getClass() != o.getClass()) return false; - GridCacheQueueAdapter that = (GridCacheQueueAdapter) o; + GridCacheQueueAdapter that = (GridCacheQueueAdapter)o; return id.equals(that.id); @@ -1051,4 +1053,4 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @Override public String toString() { return S.toString(GridCacheQueueAdapter.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index c7750a6..4880324 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter A.notNull(item, "item"); try { - boolean retVal; + return retryTopologySafe(new Callable() { + @Override public Boolean call() throws Exception { + boolean retVal; - int cnt = 0; - - while (true) { - try { try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get(); @@ -76,75 +72,59 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter tx.commit(); - break; + return retVal; } } - catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { - if (e instanceof ClusterGroupEmptyCheckedException) - throw e; - - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } - - return retVal; + }).call(); } catch (IgniteCheckedException e) { throw U.convertException(e); } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new IgniteException(e.getMessage(), e); + } } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public T poll() throws IgniteException { try { - int cnt = 0; - - T retVal; - - while (true) { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); + return retryTopologySafe(new Callable() { + @Override public T call() throws Exception { + T retVal; - if (idx != null) { - checkRemoved(idx); - - retVal = (T)cache.getAndRemove(itemKey(idx)); + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); - assert retVal != null : idx; - } - else - retVal = null; + if (idx != null) { + checkRemoved(idx); - tx.commit(); + retVal = (T)cache.getAndRemove(itemKey(idx)); - break; - } - catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { - if (e instanceof ClusterGroupEmptyCheckedException) - throw e; + assert retVal != null : idx; + } + else + retVal = null; - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to poll item, will retry [err=" + e + ']'); + tx.commit(); - U.sleep(RETRY_DELAY); + return retVal; } } - } - - return retVal; + }).call(); } catch (IgniteCheckedException e) { throw U.convertException(e); } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new IgniteException(e.getMessage(), e); + } } /** {@inheritDoc} */ @@ -153,95 +133,78 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter A.notNull(items, "items"); try { - boolean retVal; - - int cnt = 0; + return retryTopologySafe(new Callable() { + @Override public Boolean call() throws Exception { + boolean retVal; - while (true) { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get(); - - if (idx != null) { - checkRemoved(idx); - - Map putMap = new HashMap<>(); + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get(); - for (T item : items) { - putMap.put(itemKey(idx), item); + if (idx != null) { + checkRemoved(idx); - idx++; - } + Map putMap = new HashMap<>(); - cache.putAll(putMap); + for (T item : items) { + putMap.put(itemKey(idx), item); - retVal = true; - } - else - retVal = false; + idx++; + } - tx.commit(); + cache.putAll(putMap); - break; - } - catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { - if (e instanceof ClusterGroupEmptyCheckedException) - throw e; + retVal = true; + } + else + retVal = false; - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + tx.commit(); - U.sleep(RETRY_DELAY); + return retVal; } } - } - - return retVal; + }).call(); } catch (IgniteCheckedException e) { throw U.convertException(e); } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new IgniteException(e.getMessage(), e); + } } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException { try { - int cnt = 0; + retryTopologySafe(new Callable() { + @Override public Object call() throws Exception { + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); - while (true) { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); + if (idx != null) { + checkRemoved(idx); - if (idx != null) { - checkRemoved(idx); + boolean rmv = cache.remove(itemKey(idx)); - boolean rmv = cache.remove(itemKey(idx)); + assert rmv : idx; + } - assert rmv : idx; + tx.commit(); } - tx.commit(); - - break; + return null; } - catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { - if (e instanceof ClusterGroupEmptyCheckedException) - throw e; - - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + }).call(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 612c1f1..1ea5014 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -164,4 +164,4 @@ public interface DiscoverySpi extends IgniteSpi { * @throws IllegalStateException If discovery SPI has not started. */ public boolean isClientMode() throws IllegalStateException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index ae23d0e..ae3c8cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -56,6 +56,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -2152,6 +2153,41 @@ class ServerImpl extends TcpDiscoveryImpl { initConnectionCheckFrequency(); } + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + super.body(); + } + catch (Throwable e) { + if (!spi.isNodeStopping0()) { + final Ignite ignite = spi.ignite(); + + if (ignite != null) { + U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " + + "Stopping the grid in order to prevent cluster wide instability.", e); + + new Thread(new Runnable() { + @Override public void run() { + try { + spi.ignite().close(); + + U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " + + "message worker thread abnormal termination."); + } + catch (Throwable e) { + U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " + + "message worker thread abnormal termination.", e); + } + } + }).start(); + } + } + + // Must be processed by IgniteSpiThread as well. + throw e; + } + } + /** * Initializes connection check frequency. Used only when failure detection timeout is enabled. */