ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/17] ignite git commit: ignite-5932
Date Wed, 11 Oct 2017 10:43:15 GMT
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5318153d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5318153d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5318153d

Branch: refs/heads/ignite-5932
Commit: 5318153d6d0648b8496b7f4b8d02d1b1b7181d4d
Parents: ca82daa
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 11 11:37:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 11 13:42:46 2017 +0300

----------------------------------------------------------------------
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 ...arOptimisticSerializableTxPrepareFuture.java | 127 ++++++++++++++++++-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 3 files changed, 128 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 77039cc..b21a7b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                     cctx.database().checkpointReadLock();
 
                     try {
-                        assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+                        assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is
not initialized: " + this;
 
                         Collection<IgniteTxEntry> entries = near() ? allEntries() :
writeEntries();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 9d36bca..86669ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -38,6 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -56,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
 
@@ -68,6 +73,10 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
  */
 public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
{
     /** */
+    private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD
=
+        AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt");
+
+    /** */
     @GridToStringExclude
     private KeyLockFuture keyLockFut;
 
@@ -76,6 +85,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     private ClientRemapFuture remapFut;
 
     /** */
+    @GridToStringExclude
+    private MvccVersionFuture mvccVerFut;
+
+    /** */
     private int miniId;
 
     /**
@@ -193,6 +206,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         if (keyLockFut != null)
             keyLockFut.onDone(e);
+
+        if (mvccVerFut != null)
+            mvccVerFut.onDone();
     }
 
     /** {@inheritDoc} */
@@ -345,11 +361,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         boolean hasNearCache = false;
 
+        MvccCoordinator mvccCrd = null;
+
         for (IgniteTxEntry write : writes) {
             map(write, topVer, mappings, txMapping, remap, topLocked);
 
-            if (write.context().isNear())
+            GridCacheContext cctx = write.context();
+
+            if (cctx.isNear())
                 hasNearCache = true;
+
+            if (cctx.mvccEnabled() && mvccCrd == null) {
+                mvccCrd = cctx.affinity().mvccCoordinator(topVer);
+
+                if (mvccCrd == null) {
+                    onDone(new IgniteCheckedException("Mvcc coordinator is not assigned:
" + topVer));
+
+                    return;
+                }
+            }
         }
 
         for (IgniteTxEntry read : reads)
@@ -365,6 +395,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return;
         }
 
+        assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null;
+
         tx.addEntryMapping(mappings.values());
 
         cctx.mvcc().recheckPendingLocks();
@@ -376,12 +408,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         MiniFuture locNearEntriesFut = null;
 
+        int lockCnt = keyLockFut != null ? 1 : 0;
+
         // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
             MiniFuture fut = new MiniFuture(this, m, ++miniId);
 
+            lockCnt++;
+
             add((IgniteInternalFuture)fut);
 
             if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries())
{
@@ -390,7 +426,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 locNearEntriesFut = fut;
 
                 add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
+
+                lockCnt++;
+            }
+        }
+
+        if (mvccCrd != null) {
+            if (!remap) {
+                mvccVerFut = new MvccVersionFuture();
+
+                if (keyLockFut != null)
+                    keyLockFut.listen(mvccVerFut);
+
+                add(mvccVerFut);
             }
+            else
+                assert mvccVerFut != null;
+
+            mvccVerFut.init(mvccCrd, lockCnt);
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -722,6 +775,75 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
+     *
+     */
+    private class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener,
+        IgniteInClosure<IgniteInternalFuture<Void>> {
+        /** */
+        MvccCoordinator crd;
+
+        /** */
+        volatile int lockCnt;
+
+        @Override public void apply(IgniteInternalFuture<Void> keyLockFut) {
+            try {
+                keyLockFut.get();
+
+                onLockReceived();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("MvccVersionFuture ignores key lock future failure: " + e);
+            }
+        }
+
+        /**
+         * @param crd Mvcc coordinator.
+         * @param lockCnt Expected number of lock responses.
+         */
+        void init(MvccCoordinator crd, int lockCnt) {
+            assert crd != null;
+            assert lockCnt > 0;
+
+            this.crd = crd;
+            this.lockCnt = lockCnt;
+
+            assert !isDone();
+        }
+
+        /**
+         *
+         */
+        void onLockReceived() {
+            int remaining = LOCK_CNT_UPD.decrementAndGet(this);
+
+            assert remaining >= 0 : remaining;
+
+            if (remaining == 0) {
+                // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter.
+                if (cctx.localNodeId().equals(crd.nodeId()))
+                    onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
+                else
+                    cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) {
+            tx.mvccInfo(new TxMvccInfo(crdId, res));
+
+            onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccError(IgniteCheckedException e) {
+            ERR_UPD.compareAndSet(GridNearOptimisticSerializableTxPrepareFuture.this, null,
e);
+
+            onDone();
+        }
+    }
+
+    /**
      * Client remap future.
      */
     private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse,
Boolean> {
@@ -963,6 +1085,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);
+
+                        if (parent.mvccVerFut != null)
+                            parent.mvccVerFut.onLockReceived();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8f911c..0ebf2f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -520,7 +520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
             try {
                 cctx.tm().txContext(this);
 
-                assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+                assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized:
" + this;
 
                 AffinityTopologyVersion topVer = topologyVersion();
 


Mime
View raw message