ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [31/50] [abbrv] ignite git commit: IGNITE-6181 wip.
Date Tue, 19 Sep 2017 14:45:57 GMT
IGNITE-6181 wip.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/467e0bad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/467e0bad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/467e0bad

Branch: refs/heads/ignite-6181-1
Commit: 467e0bad7d25a9720738ce77f7afb5fd587727f5
Parents: ec4b16c
Author: ascherbakoff <alexey.scherbakoff@gmail.com>
Authored: Sun Sep 10 17:18:20 2017 +0300
Committer: ascherbakoff <alexey.scherbakoff@gmail.com>
Committed: Sun Sep 10 17:18:20 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   9 +-
 .../cache/distributed/near/GridNearTxLocal.java | 107 +++++++++++--------
 .../cache/transactions/IgniteTxManager.java     |  99 +++++++++++------
 .../cache/IgniteTxConfigCacheSelfTest.java      |   2 +
 4 files changed, 132 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 18fb2ff..8e075b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -139,7 +139,6 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
-import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
@@ -4032,11 +4031,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         awaitLastFut();
 
-        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
-
-        if (tx != null && tx.state() == TransactionState.ROLLED_BACK && tx.timedOut())
+        if (ctx.tm().isTimedOutThread(Thread.currentThread().getId()))
             throw new IgniteTxTimeoutCheckedException("Previous transaction was rolled back
due to timeout. " +
-                "Please start new transaction and retry an operation.");
+                    "Please start new transaction and retry an operation.");
+
+        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx == null || tx.implicit()) {
             TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());

http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 67782ee..3d93289 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -434,12 +434,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
         Object... invokeArgs
     ) {
         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
-            entryTopVer,
-            null,
-            map,
-            invokeArgs,
-            null,
-            true);
+                entryTopVer,
+                null,
+                map,
+                invokeArgs,
+                null,
+                true);
     }
 
     /**
@@ -458,12 +458,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
         });
 
         return this.<Object, Object>putAllAsync0(cacheCtx,
-            null,
-            map,
-            null,
-            null,
-            drMap,
-            false);
+                null,
+                map,
+                null,
+                null,
+                drMap,
+                false);
     }
 
     /**
@@ -2486,8 +2486,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                         processLoaded(map, keys, needVer, c);
 
                         return null;
-                    }
-                    catch (Exception e) {
+                    } catch (Exception e) {
                         setRollbackOnly();
 
                         throw new GridClosureException(e);
@@ -2845,7 +2844,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
 
         if (log.isDebugEnabled())
             log.debug("Added mappings to transaction [locId=" + cctx.localNodeId() + ", key="
+ key + ", node=" + node +
-                ", tx=" + this + ']');
+                    ", tx=" + this + ']');
     }
 
     /**
@@ -3151,6 +3150,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
             // Prepare was called explicitly.
             return fut;
 
+        removeTimeoutHandler();
+
         mapExplicitLocks();
 
         fut.prepare();
@@ -3224,15 +3225,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                     prepareFut.get();
 
                     fut0.finish(true);
-                }
-                catch (Error | RuntimeException e) {
+                } catch (Error | RuntimeException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
                     fut0.finish(false);
 
                     throw e;
-                }
-                catch (IgniteCheckedException e) {
+                } catch (IgniteCheckedException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
                     if (!(e instanceof NodeStoppingException))
@@ -3263,6 +3262,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
+        removeTimeoutHandler();
+
         if (fastFinish()) {
             state(PREPARING);
             state(PREPARED);
@@ -3275,6 +3276,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
             return new GridFinishedFuture<>((IgniteInternalTx)this);
         }
 
+        if (timedOut())
+            cctx.tm().markTimedOut(this);
+
         GridNearTxFinishFuture fut = rollbackFut;
 
         if (fut != null)
@@ -3516,7 +3520,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                     }
                     catch (IgniteCheckedException e) {
                         log.debug("Failed to prepare transaction during rollback (will ignore)
[tx=" + this + ", msg=" +
-                            e.getMessage() + ']');
+                                e.getMessage() + ']');
                     }
 
                     fut.finish(false);
@@ -3690,17 +3694,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        if (timeout() > 0 && !implicit())
-            cctx.time().removeTimeoutObject(this);
-
         TransactionState state = state();
 
         if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING
&& state != COMMITTED)
             rollback();
 
-        if (!system())
-            cctx.tm().resetUserTx();
-
         synchronized (this) {
             try {
                 while (!done())
@@ -4004,6 +4002,42 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
     }
 
     /**
+     * @param threadId new owner of transaction.
+     */
+    public void threadId(long threadId) {
+        this.threadId = threadId;
+    }
+
+    /**
+     * Removes timeout handler used for eager rollbacks on timeouts.
+     */
+    private void removeTimeoutHandler() {
+        if (timeout() > 0 && !implicit() && !timedOut())
+            cctx.time().removeTimeoutObject(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return xid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return startTime() + timeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (state(MARKED_ROLLBACK, true)) {
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    rollbackNearTxLocalAsync();
+                }
+            });
+        }
+    }
+
+    /**
      * Post-lock closure.
      *
      * @param <T> Return type.
@@ -4048,27 +4082,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteUuid timeoutId() {
-        return xid();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long endTime() {
-        return startTime() + timeout();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        if (state(MARKED_ROLLBACK, true)) {
-            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                @Override public void run() {
-                    rollbackNearTxLocalAsync();
-                }
-            });
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this,
             "thread", IgniteUtils.threadName(threadId),

http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 0722275..41e10cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteSystemProperties;
@@ -148,8 +150,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Topology version should be used when mapping internal tx. */
     private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>();
 
-    /** User transaction. */
-    private final static ThreadLocal<IgniteInternalTx> userTx = new ThreadLocal<>();
+    /** Per-thread transaction map. */
+    private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
+
+    /** Thread ids associated with rolled back transactions. */
+    private final ConcurrentSkipListSet<Long> rolledBackByTimeoutThreads = new ConcurrentSkipListSet<>();
 
     /** Per-thread system transaction map. */
     private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap();
@@ -282,15 +287,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param cacheId Cache ID.
      */
     public void rollbackTransactionsForCache(int cacheId) {
-        rollbackTransactionsForCache(cacheId, activeTransactions());
+        rollbackTransactionsForCache(cacheId, nearIdMap);
+
+        rollbackTransactionsForCache(cacheId, threadMap);
     }
 
     /**
      * @param cacheId Cache ID.
      * @param txMap Transactions map.
      */
-    private void rollbackTransactionsForCache(int cacheId, Collection<IgniteInternalTx>
txMap) {
-        for (IgniteInternalTx tx : txMap) {
+    private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx>
txMap) {
+        for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) {
+            IgniteInternalTx tx = e.getValue();
+
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
                     rollbackTx(tx);
@@ -305,8 +314,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (IgniteInternalTx tx : activeTransactions())
-            rollbackTx(tx);
+        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
+            rollbackTx(e.getValue());
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.");
@@ -365,7 +374,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" +
cctx.igniteInstanceName() + ']');
-        X.println(">>>   activeSize [size=" + activeTransactions().size() + ']');
+        X.println(">>>   threadMapSize: " + threadMap.size());
+        X.println(">>>   idMap [size=" + idMap.size() + ']');
         X.println(">>>   completedVersSortedSize: " + completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + completedVersHashMap.sizex());
     }
@@ -374,7 +384,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return Thread map size.
      */
     public int threadMapSize() {
-        return 0;
+        return threadMap.size();
     }
 
     /**
@@ -483,7 +493,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
                 if (cacheCtx == null || !cacheCtx.systemTx())
-                    userTx.set(tx);
+                    threadMap.put(tx.threadId(), tx);
                 else
                     sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()),
tx);
             }
@@ -660,7 +670,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return Not null topology version if current thread holds lock preventing topology
change.
      */
     @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx
ignore) {
-        IgniteInternalTx tx = userTx.get();
+        IgniteInternalTx tx = threadMap.get(threadId);
 
         if (tx != null) {
             AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
@@ -767,7 +777,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked"})
     private <T> T tx(GridCacheContext cctx, long threadId) {
         if (cctx == null || !cctx.systemTx())
-            return (T) userTx.get();
+            return (T)threadMap.get(threadId);
 
         TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
 
@@ -1416,19 +1426,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
      * @param tx Transaction to clear.
      */
     private void clearThreadMap(IgniteInternalTx tx) {
-        if (tx.local() && !tx.dht() && tx.system()) {
-            Integer cacheId = tx.txState().firstCacheId();
-
-            if (cacheId != null)
-                sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx);
+        if (tx.local() && !tx.dht()) {
+            if (!tx.system())
+                threadMap.remove(tx.threadId(), tx);
             else {
-                for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator();
it.hasNext(); ) {
-                    IgniteInternalTx txx = it.next();
+                Integer cacheId = tx.txState().firstCacheId();
 
-                    if (tx == txx) {
-                        it.remove();
+                if (cacheId != null)
+                    sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx);
+                else {
+                    for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator();
it.hasNext(); ) {
+                        IgniteInternalTx txx = it.next();
 
-                        break;
+                        if (tx == txx) {
+                            it.remove();
+
+                            break;
+                        }
                     }
                 }
             }
@@ -1698,6 +1712,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * Commit ended.
      */
     public void resetContext() {
+        rolledBackByTimeoutThreads.remove(Thread.currentThread().getId());
+
         threadCtx.set(null);
     }
 
@@ -2270,18 +2286,40 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
         assert !transactionMap(tx).containsValue(tx) : tx;
         assert !haveSystemTxForThread(Thread.currentThread().getId());
 
-        if(!tx.state(ACTIVE)) {
+        if (!tx.state(ACTIVE)) {
             throw new IgniteCheckedException("Trying to resume transaction with incorrect
state "
-                + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
+                    + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
         }
 
-        if (userTx.get() != null)
-            throw new IgniteCheckedException("Thread already has active transaction.");
+        long threadId = Thread.currentThread().getId();
 
-        userTx.set(tx);
+        if (threadMap.putIfAbsent(threadId, tx) != null)
+            throw new IgniteCheckedException("Thread already has started a transaction.");
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("Thread already has active transaction.");
+            throw new IgniteCheckedException("Thread already has started a transaction.");
+
+        tx.threadId(threadId);
+    }
+
+    /**
+     * Checks if thread belongs to timed out ids.
+     *
+     * @param threadId Thread id.
+     * @return {@code True} if current thread had a transaction rolled back by timeout.
+     */
+    public boolean isTimedOutThread(long threadId) {
+        return rolledBackByTimeoutThreads.contains(threadId);
+    }
+
+    /**
+     * Mark transaction thread as rolled back by timeout.
+     * Thread may not perform transactional ops until it will explicitly start a new transaction.
+     *
+     * @param tx Transaction.
+     */
+    public void markTimedOut(GridNearTxLocal tx) {
+        rolledBackByTimeoutThreads.add(tx.threadId());
     }
 
     /**
@@ -2302,11 +2340,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
         return false;
     }
 
-    /** */
-    public void resetUserTx() {
-        userTx.set(null);
-    }
-
     /**
      * Timeout object for node failure handler.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/467e0bad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 8574f0c..f2e17e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -197,6 +197,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest
{
             assert e.getCause() instanceof TransactionTimeoutException;
         }
 
+        assertNull(ignite.transactions().tx());
+
         assert !cache.containsKey("key0");
         assert !cache.containsKey("key");
 


Mime
View raw message