ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [34/40] ignite git commit: Changed tx mini future ids from IgniteUuid to int, removed some legacy code from tx processing.
Date Tue, 14 Mar 2017 15:00:28 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 90a68ad..56a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -76,13 +76,11 @@ import org.apache.ignite.lang.IgniteFutureCancelledException;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
-import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -114,7 +112,7 @@ public class IgniteTxHandler {
      * @param req Request.
      * @return Prepare future.
      */
-    public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
GridNearTxPrepareRequest req) {
+    private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version()
+
                 ", node=" + nearNodeId + ']');
@@ -272,6 +270,7 @@ public class IgniteTxHandler {
                         U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
 
                     return new GridNearTxPrepareResponse(
+                        req.partition(),
                         req.version(),
                         req.futureId(),
                         req.miniId(),
@@ -287,6 +286,27 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param entries Entries.
+     * @return First entry.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws
IgniteCheckedException {
+        if (entries == null)
+            return null;
+
+        IgniteTxEntry firstEntry = null;
+
+        for (IgniteTxEntry e : entries) {
+            e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+            if (firstEntry == null)
+                firstEntry = e;
+        }
+
+        return firstEntry;
+    }
+
+    /**
      * Prepares near transaction.
      *
      * @param nearNodeId Near node ID that initiated transaction.
@@ -309,15 +329,13 @@ public class IgniteTxHandler {
             return null;
         }
 
-        IgniteTxEntry firstEntry = null;
+        IgniteTxEntry firstEntry;
 
         try {
-            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
-                e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+            IgniteTxEntry firstWrite = unmarshal(req.writes());
+            IgniteTxEntry firstRead = unmarshal(req.reads());
 
-                if (firstEntry == null)
-                    firstEntry = e;
-            }
+            firstEntry = firstWrite != null ? firstWrite : firstRead;
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -364,6 +382,7 @@ public class IgniteTxHandler {
                     }
 
                     GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                        req.partition(),
                         req.version(),
                         req.futureId(),
                         req.miniId(),
@@ -449,17 +468,16 @@ public class IgniteTxHandler {
 
             tx.transactionNodes(req.transactionNodes());
 
-            // Set near on originating node flag only if the sender node has new version.
-            if (req.near() && FINISH_NEAR_ONE_PHASE_SINCE.compareTo(nearNode.version())
<= 0)
+            if (req.near())
                 tx.nearOnOriginatingNode(true);
 
             if (req.onePhaseCommit()) {
-                assert req.last();
+                assert req.last() : req;
 
                 tx.onePhaseCommit(true);
             }
 
-            if (req.returnValue())
+            if (req.needReturnValue())
                 tx.needReturnValue(true);
 
             IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
@@ -778,8 +796,13 @@ public class IgniteTxHandler {
                 ", commit=" + req.commit() + ']');
 
             // Always send finish response.
-            GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(),
req.futureId(),
-                req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+            GridCacheMessage res = new GridNearTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.threadId(),
+                req.futureId(),
+                req.miniId(),
+                new IgniteCheckedException("Transaction has been already completed."));
 
             try {
                 ctx.io().send(nodeId, res, req.policy());
@@ -819,14 +842,9 @@ public class IgniteTxHandler {
         try {
             assert tx != null : "Transaction is null for near finish request [nodeId=" +
                 nodeId + ", req=" + req + "]";
+            assert req.syncMode() != null : req;
 
-            if (req.syncMode() == null) {
-                boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
-                tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
-            }
-            else
-                tx.syncMode(req.syncMode());
+            tx.syncMode(req.syncMode());
 
             if (req.commit()) {
                 tx.storeEnabled(req.storeEnabled());
@@ -920,7 +938,7 @@ public class IgniteTxHandler {
      * @param nodeId Sender node ID.
      * @param req Request.
      */
-    protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest
req) {
+    private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest
req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion()
+
                 ", dhtTxId=" + req.version() +
@@ -938,7 +956,12 @@ public class IgniteTxHandler {
         GridDhtTxPrepareResponse res;
 
         try {
-            res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(),
req.deployInfo() != null);
+            res = new GridDhtTxPrepareResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                req.deployInfo() != null);
 
             // Start near transaction first.
             nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(),
nodeId, req) : null;
@@ -990,7 +1013,12 @@ public class IgniteTxHandler {
             if (nearTx != null)
                 nearTx.rollback();
 
-            res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(),
e,
+            res = new GridDhtTxPrepareResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                e,
                 req.deployInfo() != null);
         }
 
@@ -1041,7 +1069,7 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+    private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
         final GridDhtTxOnePhaseCommitAckRequest req) {
         assert nodeId != null;
         assert req != null;
@@ -1058,14 +1086,14 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     @SuppressWarnings({"unchecked"})
-    protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest
req) {
+    private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest
req) {
         assert nodeId != null;
         assert req != null;
 
         if (req.checkCommitted()) {
             boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null,
req.version());
 
-            if (!committed || !req.syncCommit())
+            if (!committed || req.syncMode() != FULL_SYNC)
                 sendReply(nodeId, req, committed, null);
             else {
                 IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
@@ -1301,9 +1329,13 @@ public class IgniteTxHandler {
      * @param committed {@code True} if transaction committed on this node.
      * @param nearTxId Near tx version.
      */
-    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed,
GridCacheVersion nearTxId) {
+    protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed,
GridCacheVersion nearTxId) {
         if (req.replyRequired() || req.checkCommitted()) {
-            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(),
req.miniId());
+            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId());
 
             if (req.checkCommitted()) {
                 res.checkCommitted(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index bd806aa..b1a4003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -37,6 +37,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -3277,7 +3278,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                             return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
                         }
                         catch (IgniteCheckedException | RuntimeException e) {
-                            rollbackAsync();
+                            if (!(e instanceof NodeStoppingException))
+                                rollbackAsync();
 
                             throw e;
                         }


Mime
View raw message