ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/38] ignite git commit: ignite-4768
Date Tue, 14 Mar 2017 08:00:26 GMT
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43429ffd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43429ffd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43429ffd

Branch: refs/heads/ignite-4768
Commit: 43429ffd9bed1aa6c6d8bd7a28e888905a7fe193
Parents: 0f069f8
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Mar 7 17:41:02 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Mar 7 17:41:02 2017 +0300

----------------------------------------------------------------------
 .../GridDistributedTxPrepareRequest.java        |  3 ++
 .../GridNearPessimisticTxPrepareFuture.java     | 23 ++++++++++---
 .../cache/transactions/IgniteTxHandler.java     | 35 +++++++++++++++-----
 3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43429ffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index aaef258..1934a84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -74,6 +74,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     /** */
     private static final int SYSTEM_TX_FLAG_MASK = 0x10;
 
+    /** */
+    private static final int KNOWN_MAPPING_FLAG_MASK = 0x20;
+
     /** Collection to message converter. */
     private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG
= new C1<Collection<UUID>, UUIDCollectionMessage>() {
         @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/43429ffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index a4132f2..4154102 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -184,6 +185,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      *
      */
     private void preparePessimistic() {
+        boolean mappingKnown = true;
+
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings
= new HashMap<>();
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -195,9 +198,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
             GridCacheContext cacheCtx = txEntry.context();
 
-            List<ClusterNode> nodes = cacheCtx.isLocal() ?
-                cacheCtx.affinity().nodesByKey(txEntry.key(), topVer) :
-                cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+            List<ClusterNode> nodes;
+
+            if (!cacheCtx.isLocal()) {
+                GridDhtPartitionTopology top = cacheCtx.topology();
+
+                if (mappingKnown && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(),
topVer)))
+                    mappingKnown = false;
+
+                nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()),
topVer);
+            }
+            else
+                nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
 
             ClusterNode primary = F.first(nodes);
 
@@ -228,9 +240,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         long timeout = tx.remainingTime();
 
-        if (timeout == -1)
+        if (timeout == -1) {
             onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled
back: " + tx));
 
+            return;
+        }
+
         for (final GridDistributedTxMapping m : mappings.values()) {
             final ClusterNode node = m.node();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/43429ffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 21c2649..f3f67a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -113,7 +113,7 @@ public class IgniteTxHandler {
      * @param req Request.
      * @return Prepare future.
      */
-    public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
GridNearTxPrepareRequest req) {
+    private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version()
+
                 ", node=" + nearNodeId + ']');
@@ -286,6 +286,27 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param entries Entries.
+     * @return First entry.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws
IgniteCheckedException {
+        if (entries == null)
+            return null;
+
+        IgniteTxEntry firstEntry = null;
+
+        for (IgniteTxEntry e : entries) {
+            e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+            if (firstEntry == null)
+                firstEntry = e;
+        }
+
+        return firstEntry;
+    }
+
+    /**
      * Prepares near transaction.
      *
      * @param nearNodeId Near node ID that initiated transaction.
@@ -308,15 +329,13 @@ public class IgniteTxHandler {
             return null;
         }
 
-        IgniteTxEntry firstEntry = null;
+        IgniteTxEntry firstEntry;
 
         try {
-            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
-                e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+            IgniteTxEntry firstWrite = unmarshal(req.writes());
+            IgniteTxEntry firstRead = unmarshal(req.reads());
 
-                if (firstEntry == null)
-                    firstEntry = e;
-            }
+            firstEntry = firstWrite != null ? firstWrite : firstRead;
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -452,7 +471,7 @@ public class IgniteTxHandler {
                 tx.nearOnOriginatingNode(true);
 
             if (req.onePhaseCommit()) {
-                assert req.last();
+                assert req.last() : req;
 
                 tx.onePhaseCommit(true);
             }


Mime
View raw message