ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/10] ignite git commit: ignite-5932
Date Thu, 12 Oct 2017 11:24:24 GMT
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b73792ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b73792ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b73792ae

Branch: refs/heads/ignite-5932
Commit: b73792aece2cd05a294d46d6befe0496f7ab1772
Parents: 031928e
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 12 13:52:51 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 12 14:24:00 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxFinishFuture.java  |  4 +-
 .../near/GridNearTxFinishAndAckFuture.java      |  4 +-
 .../near/GridNearTxFinishFuture.java            | 25 +++++++---
 .../cache/distributed/near/GridNearTxLocal.java |  2 -
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 50 ++++++++++++--------
 .../processors/cache/mvcc/MvccQueryTracker.java | 24 ++++++----
 .../processors/cache/mvcc/TxMvccInfo.java       | 12 +++--
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 34 ++++++++++++-
 8 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d624e2c..cb2eaa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -300,7 +300,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
 
             assert mvccInfo != null;
 
-            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(),
waitTxs);
+            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
waitTxs);
 
             add(fut);
 
@@ -412,7 +412,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         if (tx.onePhaseCommit())
             return false;
 
-        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null;
+        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null || F.isEmpty(tx.writeEntries());
 
         boolean sync = tx.syncMode() == FULL_SYNC;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index 5d8b77c..36efe2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -61,9 +61,9 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                     TxMvccInfo mvccInfo = tx.mvccInfo();
 
                     if (qryTracker != null)
-                        ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+                        ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true);
                     else if (mvccInfo != null) {
-                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(),
                             mvccInfo.version(),
                             null);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9b60d7..1116c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -403,6 +404,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
+    /**
+     *
+     */
+    private void ackMvccCoordinatorOnRollback() {
+        TxMvccInfo mvccInfo = tx.mvccInfo();
+
+        MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
+        if (qryTracker != null)
+            qryTracker.onTxDone(mvccInfo, cctx, false);
+        else if (mvccInfo != null)
+            cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(),
null);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public void finish(boolean commit, boolean clearThreadMap) {
@@ -421,11 +436,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             return;
         }
 
-        if (!commit && tx.mvccInfo() != null) {
-            TxMvccInfo mvccInfo = tx.mvccInfo();
-
-            cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version());
-        }
+        if (!commit)
+            ackMvccCoordinatorOnRollback();
 
         try {
             if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state()
== UNKNOWN)) {
@@ -436,7 +448,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
 
                     assert mvccInfo != null;
 
-                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(),
waitTxs);
+                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
+                        waitTxs);
 
                     add(fut);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c774f93..51d842c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3373,8 +3373,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
         if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx,
this, false)))
             return chainFinishFuture(finishFut, false);
 
-        cctx.mvcc().addFuture(fut0, fut0.futureId());
-
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
         if (prepFut == null || prepFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index a9a5eba..a5a9b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -454,47 +454,57 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
 
         ackFuts.put(fut.id, fut);
 
-        MvccCoordinatorMessage msg;
+        CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer);
+
+        try {
+            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to ack, finish without error.
+                else
+                    fut.onDone(e);
+            }
+        }
+
+        return fut;
+    }
+
+    private CoordinatorAckRequestTx createTxAckMessage(long futId,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer)
+    {
+        CoordinatorAckRequestTx msg;
 
         if (readVer != null) {
             long trackCntr = queryTrackCounter(readVer);
 
             if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
-                msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+                msg = new CoordinatorAckRequestTxAndQuery(futId,
                     updateVer.counter(),
                     trackCntr);
             }
             else {
-                msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+                msg = new CoordinatorAckRequestTxAndQueryEx(futId,
                     updateVer.counter(),
                     readVer.coordinatorVersion(),
                     trackCntr);
             }
         }
         else
-            msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
-
-        try {
-            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null) {
-                if (e instanceof ClusterTopologyCheckedException)
-                    fut.onDone(); // No need to ack, finish without error.
-                else
-                    fut.onDone(e);
-            }
-        }
+            msg = new CoordinatorAckRequestTx(futId, updateVer.counter());
 
-        return fut;
+        return msg;
     }
 
     /**
      * @param crdId Coordinator node ID.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      */
-    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
+    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion
readVer) {
+        CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer);
 
         msg.skipResponse(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index e45b77c..0e3eb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -116,7 +116,13 @@ public class MvccQueryTracker {
             cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
     }
 
-    public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext
ctx) {
+    /**
+     * @param mvccInfo Mvcc update info.
+     * @param ctx Context.
+     * @param commit If {@code true} ack commit, otherwise rollback.
+     * @return Commit ack future.
+     */
+    public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext
ctx, boolean commit) {
         MvccCoordinator mvccCrd0 = null;
         MvccCoordinatorVersion mvccVer0 = null;
 
@@ -131,24 +137,22 @@ public class MvccQueryTracker {
             }
         }
 
-        if (mvccVer0 != null) {
+        assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId());
+
+        if (mvccVer0 != null || mvccInfo != null) {
             if (mvccInfo == null) {
                 cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
 
                 return null;
             }
-            else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
-                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(),
mvccVer0);
             else {
-                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
-
-                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(),
null);
+                if (commit)
+                    return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(),
null);
+                else
+                    ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(),
null);
             }
         }
 
-        if (mvccInfo != null)
-            return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(),
null);
-
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
index 428d707..2306110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -42,8 +42,8 @@ public class TxMvccInfo implements Message {
     }
 
     /**
-     * @param crd
-     * @param mvccVer
+     * @param crd Coordinator node ID.
+     * @param mvccVer Mvcc version.
      */
     public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
@@ -53,10 +53,16 @@ public class TxMvccInfo implements Message {
         this.mvccVer = mvccVer;
     }
 
-    public UUID coordinator() {
+    /**
+     * @return Coordinator node ID.
+     */
+    public UUID coordinatorNodeId() {
         return crd;
     }
 
+    /**
+     * @return Mvcc version.
+     */
     public MvccCoordinatorVersion version() {
         return mvccVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 8964cd4..70b910b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -336,6 +336,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testActiveQueriesCleanup() throws Exception {
+        activeQueriesCleanup(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActiveQueriesCleanupTx() throws Exception {
+        activeQueriesCleanup(true);
+    }
+
+    /**
+     * @param tx If {@code true} tests reads inside transaction.
+     * @throws Exception If failed.
+     */
+    private void activeQueriesCleanup(final boolean tx) throws Exception {
         startGridsMultiThreaded(SRVS);
 
         client = true;
@@ -354,7 +369,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             @Override public void apply(Integer idx) {
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                IgniteCache cache = ignite(idx % NODES).cache(DEFAULT_CACHE_NAME);
+                Ignite node = ignite(idx % NODES);
+
+                IgniteTransactions txs = node.transactions();
+
+                IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
 
                 while (System.currentTimeMillis() < stopTime) {
                     int keyCnt = rnd.nextInt(10) + 1;
@@ -364,7 +383,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                     for (int i = 0; i < keyCnt; i++)
                         keys.add(rnd.nextInt());
 
-                    cache.getAll(keys);
+                    if (tx) {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            cache.getAll(keys);
+
+                            if (rnd.nextBoolean())
+                                tx.commit();
+                            else
+                                tx.rollback();
+                        }
+                    }
+                    else
+                        cache.getAll(keys);
                 }
             }
         }, NODES * 2, "get-thread");


Mime
View raw message