Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7EBAD200D27 for ; Tue, 19 Sep 2017 16:45:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7D7E71609DB; Tue, 19 Sep 2017 14:45:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BE741160BE3 for ; Tue, 19 Sep 2017 16:45:29 +0200 (CEST) Received: (qmail 52543 invoked by uid 500); 19 Sep 2017 14:45:29 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 52350 invoked by uid 99); 19 Sep 2017 14:45:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 14:45:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86E82F59E9; Tue, 19 Sep 2017 14:45:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 19 Sep 2017 14:45:57 -0000 Message-Id: In-Reply-To: <3f3b6244698b4ca1a42538d800915588@git.apache.org> References: <3f3b6244698b4ca1a42538d800915588@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] ignite git commit: IGNITE-6181 wip. archived-at: Tue, 19 Sep 2017 14:45:33 -0000 IGNITE-6181 wip. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/467e0bad Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/467e0bad Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/467e0bad Branch: refs/heads/ignite-6181-1 Commit: 467e0bad7d25a9720738ce77f7afb5fd587727f5 Parents: ec4b16c Author: ascherbakoff Authored: Sun Sep 10 17:18:20 2017 +0300 Committer: ascherbakoff Committed: Sun Sep 10 17:18:20 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 107 +++++++++++-------- .../cache/transactions/IgniteTxManager.java | 99 +++++++++++------ .../cache/IgniteTxConfigCacheSelfTest.java | 2 + 4 files changed, 132 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 18fb2ff..8e075b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -139,7 +139,6 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; -import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -4032,11 +4031,11 @@ public abstract class GridCacheAdapter implements IgniteInternalCache)putAllAsync0(cacheCtx, - entryTopVer, - null, - map, - invokeArgs, - null, - true); + entryTopVer, + null, + map, + invokeArgs, + null, + true); } /** @@ -458,12 +458,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou }); return this.putAllAsync0(cacheCtx, - null, - map, - null, - null, - drMap, - false); + null, + map, + null, + null, + drMap, + false); } /** @@ -2486,8 +2486,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou processLoaded(map, keys, needVer, c); return null; - } - catch (Exception e) { + } catch (Exception e) { setRollbackOnly(); throw new GridClosureException(e); @@ -2845,7 +2844,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Added mappings to transaction [locId=" + cctx.localNodeId() + ", key=" + key + ", node=" + node + - ", tx=" + this + ']'); + ", tx=" + this + ']'); } /** @@ -3151,6 +3150,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou // Prepare was called explicitly. return fut; + removeTimeoutHandler(); + mapExplicitLocks(); fut.prepare(); @@ -3224,15 +3225,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou prepareFut.get(); fut0.finish(true); - } - catch (Error | RuntimeException e) { + } catch (Error | RuntimeException e) { COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); fut0.finish(false); throw e; - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); if (!(e instanceof NodeStoppingException)) @@ -3263,6 +3262,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); + removeTimeoutHandler(); + if (fastFinish()) { state(PREPARING); state(PREPARED); @@ -3275,6 +3276,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return new GridFinishedFuture<>((IgniteInternalTx)this); } + if (timedOut()) + cctx.tm().markTimedOut(this); + GridNearTxFinishFuture fut = rollbackFut; if (fut != null) @@ -3516,7 +3520,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } catch (IgniteCheckedException e) { log.debug("Failed to prepare transaction during rollback (will ignore) [tx=" + this + ", msg=" + - e.getMessage() + ']'); + e.getMessage() + ']'); } fut.finish(false); @@ -3690,17 +3694,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - if (timeout() > 0 && !implicit()) - cctx.time().removeTimeoutObject(this); - TransactionState state = state(); if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); - if (!system()) - cctx.tm().resetUserTx(); - synchronized (this) { try { while (!done()) @@ -4004,6 +4002,42 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * @param threadId new owner of transaction. + */ + public void threadId(long threadId) { + this.threadId = threadId; + } + + /** + * Removes timeout handler used for eager rollbacks on timeouts. + */ + private void removeTimeoutHandler() { + if (timeout() > 0 && !implicit() && !timedOut()) + cctx.time().removeTimeoutObject(this); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return xid(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return startTime() + timeout(); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (state(MARKED_ROLLBACK, true)) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + rollbackNearTxLocalAsync(); + } + }); + } + } + + /** * Post-lock closure. * * @param Return type. @@ -4048,27 +4082,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return xid(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return startTime() + timeout(); - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (state(MARKED_ROLLBACK, true)) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - rollbackNearTxLocalAsync(); - } - }); - } - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "thread", IgniteUtils.threadName(threadId), http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 0722275..41e10cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteSystemProperties; @@ -148,8 +150,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Topology version should be used when mapping internal tx. */ private final ThreadLocal txTop = new ThreadLocal<>(); - /** User transaction. */ - private final static ThreadLocal userTx = new ThreadLocal<>(); + /** Per-thread transaction map. */ + private final ConcurrentMap threadMap = newMap(); + + /** Thread ids associated with rolled back transactions. */ + private final ConcurrentSkipListSet rolledBackByTimeoutThreads = new ConcurrentSkipListSet<>(); /** Per-thread system transaction map. */ private final ConcurrentMap sysThreadMap = newMap(); @@ -282,15 +287,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param cacheId Cache ID. */ public void rollbackTransactionsForCache(int cacheId) { - rollbackTransactionsForCache(cacheId, activeTransactions()); + rollbackTransactionsForCache(cacheId, nearIdMap); + + rollbackTransactionsForCache(cacheId, threadMap); } /** * @param cacheId Cache ID. * @param txMap Transactions map. */ - private void rollbackTransactionsForCache(int cacheId, Collection txMap) { - for (IgniteInternalTx tx : txMap) { + private void rollbackTransactionsForCache(int cacheId, ConcurrentMap txMap) { + for (Map.Entry e : txMap.entrySet()) { + IgniteInternalTx tx = e.getValue(); + for (IgniteTxEntry entry : tx.allEntries()) { if (entry.cacheId() == cacheId) { rollbackTx(tx); @@ -305,8 +314,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (IgniteInternalTx tx : activeTransactions()) - rollbackTx(tx); + for (Map.Entry e : threadMap.entrySet()) + rollbackTx(e.getValue()); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -365,7 +374,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> activeSize [size=" + activeTransactions().size() + ']'); + X.println(">>> threadMapSize: " + threadMap.size()); + X.println(">>> idMap [size=" + idMap.size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } @@ -374,7 +384,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Thread map size. */ public int threadMapSize() { - return 0; + return threadMap.size(); } /** @@ -483,7 +493,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // and overwrite local transaction. if (tx.local() && !tx.dht()) { if (cacheCtx == null || !cacheCtx.systemTx()) - userTx.set(tx); + threadMap.put(tx.threadId(), tx); else sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); } @@ -660,7 +670,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Not null topology version if current thread holds lock preventing topology change. */ @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { - IgniteInternalTx tx = userTx.get(); + IgniteInternalTx tx = threadMap.get(threadId); if (tx != null) { AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); @@ -767,7 +777,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked"}) private T tx(GridCacheContext cctx, long threadId) { if (cctx == null || !cctx.systemTx()) - return (T) userTx.get(); + return (T)threadMap.get(threadId); TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); @@ -1416,19 +1426,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param tx Transaction to clear. */ private void clearThreadMap(IgniteInternalTx tx) { - if (tx.local() && !tx.dht() && tx.system()) { - Integer cacheId = tx.txState().firstCacheId(); - - if (cacheId != null) - sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); + if (tx.local() && !tx.dht()) { + if (!tx.system()) + threadMap.remove(tx.threadId(), tx); else { - for (Iterator it = sysThreadMap.values().iterator(); it.hasNext(); ) { - IgniteInternalTx txx = it.next(); + Integer cacheId = tx.txState().firstCacheId(); - if (tx == txx) { - it.remove(); + if (cacheId != null) + sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); + else { + for (Iterator it = sysThreadMap.values().iterator(); it.hasNext(); ) { + IgniteInternalTx txx = it.next(); - break; + if (tx == txx) { + it.remove(); + + break; + } } } } @@ -1698,6 +1712,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Commit ended. */ public void resetContext() { + rolledBackByTimeoutThreads.remove(Thread.currentThread().getId()); + threadCtx.set(null); } @@ -2270,18 +2286,40 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !transactionMap(tx).containsValue(tx) : tx; assert !haveSystemTxForThread(Thread.currentThread().getId()); - if(!tx.state(ACTIVE)) { + if (!tx.state(ACTIVE)) { throw new IgniteCheckedException("Trying to resume transaction with incorrect state " - + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); + + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } - if (userTx.get() != null) - throw new IgniteCheckedException("Thread already has active transaction."); + long threadId = Thread.currentThread().getId(); - userTx.set(tx); + if (threadMap.putIfAbsent(threadId, tx) != null) + throw new IgniteCheckedException("Thread already has started a transaction."); if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) - throw new IgniteCheckedException("Thread already has active transaction."); + throw new IgniteCheckedException("Thread already has started a transaction."); + + tx.threadId(threadId); + } + + /** + * Checks if thread belongs to timed out ids. + * + * @param threadId Thread id. + * @return {@code True} if current thread had a transaction rolled back by timeout. + */ + public boolean isTimedOutThread(long threadId) { + return rolledBackByTimeoutThreads.contains(threadId); + } + + /** + * Mark transaction thread as rolled back by timeout. + * Thread may not perform transactional ops until it will explicitly start a new transaction. + * + * @param tx Transaction. + */ + public void markTimedOut(GridNearTxLocal tx) { + rolledBackByTimeoutThreads.add(tx.threadId()); } /** @@ -2302,11 +2340,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return false; } - /** */ - public void resetUserTx() { - userTx.set(null); - } - /** * Timeout object for node failure handler. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 8574f0c..f2e17e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -197,6 +197,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { assert e.getCause() instanceof TransactionTimeoutException; } + assertNull(ignite.transactions().tx()); + assert !cache.containsKey("key0"); assert !cache.containsKey("key");