From commits-return-119177-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Tue Jul 17 23:45:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0D75E1807AC for ; Tue, 17 Jul 2018 23:45:53 +0200 (CEST) Received: (qmail 34774 invoked by uid 500); 17 Jul 2018 21:45:53 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 34610 invoked by uid 99); 17 Jul 2018 21:45:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2018 21:45:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7CE79E10E1; Tue, 17 Jul 2018 21:45:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Date: Tue, 17 Jul 2018 21:46:04 -0000 Message-Id: <3ba7781966ed4d029d273a72a891a91e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/28] ignite git commit: IGNITE-8820 Add ability to accept changing txTimeoutOnPartitionMapExchange while waiting for pending transactions. - Fixes #4217. IGNITE-8820 Add ability to accept changing txTimeoutOnPartitionMapExchange while waiting for pending transactions. - Fixes #4217. Signed-off-by: Ivan Rakov Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09ce06cc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09ce06cc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09ce06cc Branch: refs/heads/ignite-8783 Commit: 09ce06cc02ff93dd3c255a130253637b34aa48f1 Parents: ddc41e2 Author: Ivan Daschinskiy Authored: Mon Jul 16 17:18:21 2018 +0300 Committer: Ivan Rakov Committed: Mon Jul 16 18:02:01 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 14 +- .../GridDhtPartitionsExchangeFuture.java | 37 +++-- .../optimized/OptimizedMarshallerTest.java | 6 - .../SetTxTimeoutOnPartitionMapExchangeTest.java | 146 +++++++++++++++++++ 4 files changed, 171 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d3fddab..644c26e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -46,7 +46,6 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -2493,17 +2492,13 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana int dumpCnt = 0; - IgniteConfiguration cfg = cctx.gridConfig(); - - long rollbackTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange(); - final long dumpTimeout = 2 * cctx.gridConfig().getNetworkTimeout(); long nextDumpTime = 0; while (true) { try { - resVer = exchFut.get(rollbackTimeout > 0 ? rollbackTimeout : dumpTimeout); + resVer = exchFut.get(dumpTimeout); break; } @@ -2512,7 +2507,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana U.warn(diagnosticLog, "Failed to wait for partition map exchange [" + "topVer=" + exchFut.initialVersion() + ", node=" + cctx.localNodeId() + "]. " + - (rollbackTimeout == 0 ? "Consider changing TransactionConfiguration.txTimeoutOnPartitionMapSynchronization to non default value to avoid this message. " : "") + "Dumping pending objects that might be the cause: "); try { @@ -2524,12 +2518,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout); } - - if (rollbackTimeout > 0) { - rollbackTimeout = 0; // Try automatic rollback only once. - - cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion()); - } } catch (Exception e) { if (exchFut.reconnectOnError(e)) http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d44856f..75cd491 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1205,10 +1205,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte distributed = false; // On first phase we wait for finishing all local tx updates, atomic updates and lock releases on all nodes. - waitPartitionRelease(distributed); + waitPartitionRelease(distributed, true); // Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase. - waitPartitionRelease(false); + if (distributed) + waitPartitionRelease(false, false); boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; @@ -1319,10 +1320,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc. * * @param distributed If {@code true} then node should wait for partition release completion on all other nodes. + * @param doRollback If {@code true} tries to rollback transactions which lock partitions. Avoids unnecessary calls + * of {@link org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager#rollbackOnTopologyChange} * * @throws IgniteCheckedException If failed. */ - private void waitPartitionRelease(boolean distributed) throws IgniteCheckedException { + private void waitPartitionRelease(boolean distributed, boolean doRollback) throws IgniteCheckedException { Latch releaseLatch = null; // Wait for other nodes only on first phase. @@ -1342,34 +1345,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte int dumpCnt = 0; - long waitStart = U.currentTimeMillis(); - long nextDumpTime = 0; IgniteConfiguration cfg = cctx.gridConfig(); - boolean rollbackEnabled = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() > 0; + long waitStart = U.currentTimeMillis(); long waitTimeout = 2 * cfg.getNetworkTimeout(); + boolean txRolledBack = !doRollback; + while (true) { + // Read txTimeoutOnPME from configuration after every iteration. + long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange(); + try { - partReleaseFut.get(rollbackEnabled ? - cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() : - waitTimeout, TimeUnit.MILLISECONDS); + // This avoids unnessesary waiting for rollback. + partReleaseFut.get(curTimeout > 0 && !txRolledBack ? + Math.min(curTimeout, waitTimeout) : waitTimeout, TimeUnit.MILLISECONDS); break; } catch (IgniteFutureTimeoutCheckedException ignored) { // Print pending transactions and locks that might have led to hang. if (nextDumpTime <= U.currentTimeMillis()) { - dumpPendingObjects(partReleaseFut); + dumpPendingObjects(partReleaseFut, curTimeout <= 0 && !txRolledBack); nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, waitTimeout); } - if (rollbackEnabled) { - rollbackEnabled = false; + if (!txRolledBack && curTimeout > 0 && U.currentTimeMillis() - waitStart >= curTimeout) { + txRolledBack = true; cctx.tm().rollbackOnTopologyChange(initialVersion()); } @@ -1478,12 +1484,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param partReleaseFut Partition release future. + * @param txTimeoutNotifyFlag If {@code true} print transaction rollback timeout on PME notification. */ - private void dumpPendingObjects(IgniteInternalFuture partReleaseFut) { + private void dumpPendingObjects(IgniteInternalFuture partReleaseFut, boolean txTimeoutNotifyFlag) { U.warn(cctx.kernalContext().cluster().diagnosticLog(), "Failed to wait for partition release future [topVer=" + initialVersion() + ", node=" + cctx.localNodeId() + "]"); + if (txTimeoutNotifyFlag) + U.warn(cctx.kernalContext().cluster().diagnosticLog(), "Consider changing TransactionConfiguration." + + "txTimeoutOnPartitionMapExchange to non default value to avoid this message."); + U.warn(log, "Partition release future: " + partReleaseFut); U.warn(cctx.kernalContext().cluster().diagnosticLog(), http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java index 79496ae..a7e29c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java @@ -414,12 +414,6 @@ public class OptimizedMarshallerTest extends GridCommonAbstractTest { }); allocationOverflowCheck(() -> { - marshaller().marshal(new int[1<<30]); - marshaller().marshal(new int[1<<30]); - return null; - }); - - allocationOverflowCheck(() -> { marshaller().marshal(new float[1<<29]); marshaller().marshal(new float[1<<29]); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java index 3152349..7033529 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java @@ -18,15 +18,25 @@ package org.apache.ignite.internal.processors.cache; import java.lang.management.ManagementFactory; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.management.MBeanServer; import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.TransactionsMXBeanImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; @@ -36,6 +46,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.internal.util.typedef.X.hasCause; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * @@ -120,6 +137,135 @@ public class SetTxTimeoutOnPartitionMapExchangeTest extends GridCommonAbstractTe } /** + * Tests applying new txTimeoutOnPartitionMapExchange while an exchange future runs. + * + * @throws Exception If fails. + */ + public void testSetTxTimeoutDuringPartitionMapExchange() throws Exception { + IgniteEx ig = (IgniteEx)startGrids(2); + + final long longTimeout = 600_000L; + final long shortTimeout = 5_000L; + + TransactionsMXBean mxBean = txMXBean(0); + + // Case 1: set very long txTimeoutOnPME, transaction should be rolled back. + mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout); + assertTxTimeoutOnPartitionMapExchange(longTimeout); + + AtomicReference txEx = new AtomicReference<>(); + + IgniteInternalFuture fut = startDeadlock(ig, txEx, 0); + + startGridAsync(2); + + waitForExchangeStarted(ig); + + mxBean.setTxTimeoutOnPartitionMapExchange(shortTimeout); + + awaitPartitionMapExchange(); + + fut.get(); + + assertTrue("Transaction should be rolled back", hasCause(txEx.get(), TransactionRollbackException.class)); + + // Case 2: txTimeoutOnPME will be set to 0 after starting of PME, transaction should be cancelled on timeout. + mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout); + assertTxTimeoutOnPartitionMapExchange(longTimeout); + + fut = startDeadlock(ig, txEx, 10000L); + + startGridAsync(3); + + waitForExchangeStarted(ig); + + mxBean.setTxTimeoutOnPartitionMapExchange(0); + + fut.get(); + + assertTrue("Transaction should be canceled on timeout", hasCause(txEx.get(), TransactionTimeoutException.class)); + } + + /** + * Start test deadlock + * + * @param ig Ig. + * @param txEx Atomic reference to transaction exception. + * @param timeout Transaction timeout. + */ + private IgniteInternalFuture startDeadlock(Ignite ig, AtomicReference txEx, long timeout) { + IgniteCache cache = ig.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + AtomicInteger thCnt = new AtomicInteger(); + + CyclicBarrier barrier = new CyclicBarrier(2); + + return GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() { + int thNum = thCnt.incrementAndGet(); + + try (Transaction tx = ig.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) { + cache.put(thNum, 1); + + barrier.await(); + + cache.put(thNum % 2 + 1, 1); + + tx.commit(); + } + catch (Exception e) { + txEx.set(e); + } + + return null; + } + }, 2, "tx-thread"); + } + + /** + * Starts grid asynchronously and returns just before grid starting. + * Avoids blocking on PME. + * + * @param idx Test grid index. + * @throws Exception If fails. + */ + private void startGridAsync(int idx) throws Exception { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + startGrid(idx); + } + catch (Exception e) { + // no-op. + } + } + }); + } + + /** + * Waits for srarting PME on grid. + * + * @param ig Ignite grid. + * @throws IgniteCheckedException If fails. + */ + private void waitForExchangeStarted(IgniteEx ig) throws IgniteCheckedException { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridDhtPartitionsExchangeFuture fut: ig.context().cache().context().exchange().exchangeFutures()) { + if (!fut.isDone()) + return true; + } + + return false; + } + }, WAIT_CONDITION_TIMEOUT); + + // Additional waiting to ensure that code really start waiting for partition release. + U.sleep(5_000L); + } + + /** * */ private TransactionsMXBean txMXBean(int igniteInt) throws Exception {