ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [33/50] [abbrv] ignite git commit: ignite-4768
Date Mon, 13 Mar 2017 16:35:25 GMT
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d10806f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d10806f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d10806f

Branch: refs/heads/ignite-4768-1
Commit: 8d10806f40c24cd77ba72430811ff1464be7a01e
Parents: de2697d
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Mar 13 14:50:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Mar 13 14:50:23 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   3 +-
 .../processors/cache/GridCacheIoManager.java    |   4 +-
 .../GridDistributedTxFinishRequest.java         |  13 ++
 .../distributed/dht/GridDhtTxFinishFuture.java  |  83 +++++----
 .../distributed/dht/GridDhtTxFinishRequest.java |   4 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |  14 +-
 .../dht/GridDhtTxPrepareRequest.java            |  59 +------
 .../near/GridNearTxFinishFuture.java            |  12 +-
 .../near/GridNearTxFinishRequest.java           |   2 +
 .../cache/transactions/IgniteTxHandler.java     | 174 +++++++++++--------
 10 files changed, 193 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a26187..85e2cbe 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -170,7 +171,7 @@ public class MessageCodeGenerator {
 
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 
-//        gen.generateAndWrite(GridMessageCollection.class);
+        gen.generateAndWrite(GridDhtTxPrepareRequest.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);
 
 //        gen.generateAndWrite(GridDistributedLockRequest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 71f4e1c..e91bc9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
 
             unmarshall(nodeId, cacheMsg);
 
-//            if (!cacheMsg.partitionExchangeMessage())
-//                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+            if (!cacheMsg.partitionExchangeMessage())
+                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
 
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index ab9f0ff..c794f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -63,6 +63,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage
i
     /** */
     protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
 
+    /** */
+    private static final int DHT_REPLY_NEAR_FLAG_MASK = 0x40;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -138,6 +141,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage
i
         @NotNull AffinityTopologyVersion topVer,
         @Nullable GridCacheVersion commitVer,
         long threadId,
+        boolean dhtReplyNear,
         boolean commit,
         boolean invalidate,
         boolean sys,
@@ -171,6 +175,15 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage
i
         this.txSize = txSize;
 
         completedVersions(committedVers, rolledbackVers);
+
+        setFlag(dhtReplyNear, DHT_REPLY_NEAR_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if transaction works in mode when DHT nodes reply directly to
near node.
+     */
+    public final boolean dhtReplyNear() {
+        return isFlag(DHT_REPLY_NEAR_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 17e9047..72a9b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -236,7 +236,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 if (finishErr == null)
                     finishErr = this.tx.commitError();
 
-                if (this.tx.syncMode() != PRIMARY_SYNC)
+                if (this.tx.syncMode() != PRIMARY_SYNC && !this.tx.dhtReplyNear())
                     this.tx.sendFinishReply(finishErr);
 
                 // Don't forget to clean up.
@@ -322,7 +322,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.commitVersion(),
                 tx.threadId(),
                 tx.isolation(),
-                false,
+                /*dhtReplyNear*/false, // TODO IGNITE-4768.
+                /*commit*/false,
                 tx.isInvalidate(),
                 tx.system(),
                 tx.ioPolicy(),
@@ -387,6 +388,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
         boolean sync = tx.syncMode() == FULL_SYNC;
 
+        boolean dhtReplyNear = tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC;
+
         if (tx.explicitLock())
             sync = true;
 
@@ -406,9 +409,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 // Nothing to send.
                 continue;
 
-            MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
+            MiniFuture fut = null;
 
-            add(fut); // Append new future.
+            if (!dhtReplyNear) {
+                fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
+
+                add(fut); // Append new future.
+            }
 
             Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size());
 
@@ -417,13 +424,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
-                futId,
-                fut.futureId(),
+                dhtReplyNear ? tx.nearFutureId() : futId,
+                dhtReplyNear ? 0 : fut.futureId(),
                 tx.topologyVersion(),
                 tx.xidVersion(),
                 tx.commitVersion(),
                 tx.threadId(),
                 tx.isolation(),
+                dhtReplyNear,
                 commit,
                 tx.isInvalidate(),
                 tx.system(),
@@ -455,22 +463,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
                 if (sync)
                     res = true;
-                else
+                else if (fut != null)
                     fut.onDone();
             }
             catch (IgniteCheckedException e) {
-                // Fail the whole thing.
-                if (e instanceof ClusterTopologyCheckedException)
-                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
-                else {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DHT finish fut, failed to send request dht [txId="
+ tx.nearXidVersion() +
-                            ", dhtTxId=" + tx.xidVersion() +
-                            ", node=" + n.id() +
-                            ", err=" + e + ']');
-                    }
+                if (fut != null) {
+                    if (e instanceof ClusterTopologyCheckedException)
+                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                    else {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT finish fut, failed to send request dht [txId="
+ tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + n.id() +
+                                ", err=" + e + ']');
+                        }
 
-                    fut.onResult(e);
+                        // TODO IGNITE-4768: reply on near with error?
+                        fut.onResult(e);
+                    }
                 }
             }
         }
@@ -481,19 +491,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     // Nothing to send.
                     continue;
 
-                MiniFuture fut = new MiniFuture(++miniId, null, nearMapping);
+                MiniFuture fut = null;
 
-                add(fut); // Append new future.
+                if (!dhtReplyNear) {
+                    fut = new MiniFuture(++miniId, null, nearMapping);
+
+                    add(fut); // Append new future.
+                }
 
                 GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                     tx.nearNodeId(),
                     futId,
-                    fut.futureId(),
+                    fut != null ? fut.futureId() : -1,
                     tx.topologyVersion(),
                     tx.xidVersion(),
                     tx.commitVersion(),
                     tx.threadId(),
                     tx.isolation(),
+                    dhtReplyNear,
                     commit,
                     tx.isInvalidate(),
                     tx.system(),
@@ -524,22 +539,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
                     if (sync)
                         res = true;
-                    else
+                    else if (fut != null)
                         fut.onDone();
                 }
                 catch (IgniteCheckedException e) {
-                    // Fail the whole thing.
-                    if (e instanceof ClusterTopologyCheckedException)
-                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
-                    else {
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("DHT finish fut, failed to send request near [txId="
+ tx.nearXidVersion() +
-                                ", dhtTxId=" + tx.xidVersion() +
-                                ", node=" + nearMapping.primary().id() +
-                                ", err=" + e + ']');
+                    if (fut != null) {
+                        if (e instanceof ClusterTopologyCheckedException)
+                            fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                        else {
+                            if (msgLog.isDebugEnabled()) {
+                                msgLog.debug("DHT finish fut, failed to send request near
[txId=" + tx.nearXidVersion() +
+                                    ", dhtTxId=" + tx.xidVersion() +
+                                    ", node=" + nearMapping.primary().id() +
+                                    ", err=" + e + ']');
+                            }
+
+                            // TODO IGNITE-4768: reply on near with error?
+                            fut.onResult(e);
                         }
-
-                        fut.onResult(e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index d9b3ae7..40f96c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -106,6 +106,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest
{
         GridCacheVersion commitVer,
         long threadId,
         TransactionIsolation isolation,
+        boolean dhtReplyNear,
         boolean commit,
         boolean invalidate,
         boolean sys,
@@ -129,6 +130,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest
{
             topVer,
             commitVer,
             threadId,
+            dhtReplyNear,
             commit,
             invalidate,
             sys,
@@ -190,6 +192,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest
{
         GridCacheVersion commitVer,
         long threadId,
         TransactionIsolation isolation,
+        boolean dhtReplyNear,
         boolean commit,
         boolean invalidate,
         boolean sys,
@@ -216,6 +219,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest
{
             commitVer,
             threadId,
             isolation,
+            dhtReplyNear,
             commit,
             invalidate,
             sys,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ca028f8..c8e06af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1276,7 +1276,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                     MiniFuture fut = null;
 
-                    if (!tx.dhtReplyNear()) {
+                    if (!dhtReplyNear) {
                         fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
 
                         add(fut); // Append new future.
@@ -1285,10 +1285,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     assert txNodes != null;
 
                     GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                        futId,
-                        fut != null ? fut.futureId() : 0,
-                        nearFutId,
-                        nearMiniId,
+                        dhtReplyNear ? nearFutId : futId,
+                        dhtReplyNear ? nearMiniId : fut.futureId(),
                         tx.topologyVersion(),
                         tx,
                         timeout,
@@ -1405,10 +1403,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         add(fut); // Append new future.
 
                         GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                            futId,
-                            fut.futureId(),
-                            nearFutId,
-                            nearMiniId,
+                            dhtReplyNear ? nearFutId : futId,
+                            dhtReplyNear ? nearMiniId : fut.futureId(),
                             tx.topologyVersion(),
                             tx,
                             timeout,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 85a65a8..04a296d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -61,12 +61,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     /** Mini future ID. */
     private int miniId;
 
-    /** Future ID. */
-    private IgniteUuid nearFutId;
-
-    /** Mini future ID. */
-    private int nearMiniId;
-
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -113,8 +107,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     /**
      * @param futId Future ID.
      * @param miniId Mini future ID.
-     * @param nearFutId Near node future ID.
-     * @param nearMiniId Near node mini future ID.
      * @param topVer Topology version.
      * @param tx Transaction.
      * @param timeout Transaction timeout.
@@ -129,8 +121,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
         int miniId,
-        IgniteUuid nearFutId,
-        int nearMiniId,
         AffinityTopologyVersion topVer,
         GridDhtTxLocalAdapter tx,
         long timeout,
@@ -156,8 +146,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
             onePhaseCommit,
             addDepInfo);
 
-        assert dhtNearReply || (futId != null && miniId != 0);
-        assert !dhtNearReply || (nearFutId != null && nearMiniId != 0);
+        assert futId != null;
+        assert miniId != 0;
 
         this.topVer = topVer;
         this.futId = futId;
@@ -166,8 +156,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
         this.nearXidVer = nearXidVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
-        this.nearFutId = nearFutId;
-        this.nearMiniId = nearMiniId;
 
         needReturnValue(retVal);
 
@@ -264,21 +252,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     }
 
     /**
-     * @return Near future ID.
-     */
-    public IgniteUuid nearFutureId() {
-        return nearFutId;
-    }
-
-    /**
-     * @return Near mini future ID.
-     */
-    public int nearMiniId() {
-        return nearMiniId;
-    }
-
-
-    /**
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
@@ -395,18 +368,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
 
                 writer.incrementState();
 
-            case 23:
-                if (!writer.writeIgniteUuid("nearFutId", nearFutId))
-                    return false;
-
-                writer.incrementState();
-
-            case 24:
-                if (!writer.writeInt("nearMiniId", nearMiniId))
-                    return false;
-
-                writer.incrementState();
-
             case 25:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
@@ -501,22 +462,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
 
                 reader.incrementState();
 
-            case 23:
-                nearFutId = reader.readIgniteUuid("nearFutId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 24:
-                nearMiniId = reader.readInt("nearMiniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 25:
                 nearNodeId = reader.readUuid("nearNodeId");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c55d515..44455ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -716,10 +716,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         // Version to be added in completed versions on primary node.
         GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion()
: null;
 
+        boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
+
         GridNearTxFinishRequest req = new GridNearTxFinishRequest(
             futId,
             tx.xidVersion(),
             tx.threadId(),
+            dhtReplyNear,
             commit,
             tx.isInvalidate(),
             tx.system(),
@@ -737,8 +740,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.activeCachesDeploymentEnabled()
         );
 
-        boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
-
         // If this is the primary node for the keys.
         if (n.isLocal()) {
             req.miniId(miniId);
@@ -855,11 +856,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.commitVersion(),
             tx.threadId(),
             tx.isolation(),
-            true,
-            false,
+            /*dhtReplyNear*/false,
+            /*commit*/true,
+            /*invalidate*/false,
             tx.system(),
             tx.ioPolicy(),
-            false,
+            /*sysInvalidate*/false,
             tx.syncMode(),
             null,
             null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 05c1f3e..58e75f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -73,6 +73,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
         IgniteUuid futId,
         GridCacheVersion xidVer,
         long threadId,
+        boolean dhtReplyNear,
         boolean commit,
         boolean invalidate,
         boolean sys,
@@ -94,6 +95,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest
{
             topVer,
             null,
             threadId,
+            dhtReplyNear,
             commit,
             invalidate,
             sys,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 754979c..71b847a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -967,8 +969,8 @@ public class IgniteTxHandler {
                 nearRes = new GridDhtTxPrepareResponse(
                     req.partition(),
                     req.nearXidVersion(),
-                    req.nearFutureId(),
-                    req.nearMiniId(),
+                    req.futureId(),
+                    req.miniId(),
                     req.deployInfo() != null);
 
                 nearRes.nearNodeResponse(true);
@@ -1036,6 +1038,7 @@ public class IgniteTxHandler {
             if (nearTx != null)
                 nearTx.rollback();
 
+            // TODO IGNITE-4768.
             res = new GridDhtTxPrepareResponse(
                 req.partition(),
                 req.version(),
@@ -1116,19 +1119,19 @@ public class IgniteTxHandler {
     @SuppressWarnings({"unchecked"})
     private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest
req) {
         assert nodeId != null;
-        assert req != null;
+        assert req.nearNodeId() != null : req;
 
         if (req.checkCommitted()) {
             boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null,
req.version());
 
             if (!committed || req.syncMode() != FULL_SYNC)
-                sendReply(nodeId, req, committed, null);
+                sendCheckCommittedReply(nodeId, req, committed);
             else {
                 IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
 
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> fut) {
-                        sendReply(nodeId, req, true, null);
+                        sendCheckCommittedReply(nodeId, req, true);
                     }
                 });
             }
@@ -1181,16 +1184,16 @@ public class IgniteTxHandler {
 
             if (completeFut != null) {
                 completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>()
{
-                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx>
igniteTxIgniteFuture) {
-                        sendReply(nodeId, req, true, nearTxId);
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx>
fut) {
+                        sendFinishReply(nodeId, req, nearTxId);
                     }
                 });
             }
             else
-                sendReply(nodeId, req, true, nearTxId);
+                sendFinishReply(nodeId, req, nearTxId);
         }
         else
-            sendReply(nodeId, req, true, null);
+            sendFinishReply(nodeId, req, null);
 
         assert req.txState() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version())
== null) : req;
     }
@@ -1350,81 +1353,75 @@ public class IgniteTxHandler {
     }
 
     /**
-     * Sends tx finish response to remote node, if response is requested.
-     *
      * @param nodeId Node id that originated finish request.
      * @param req Request.
      * @param committed {@code True} if transaction committed on this node.
-     * @param nearTxId Near tx version.
      */
-    private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion
nearTxId) {
-        if (req.replyRequired() || req.checkCommitted()) {
-            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
-                req.partition(),
-                req.version(),
-                req.futureId(),
-                req.miniId());
+    private void sendCheckCommittedReply(UUID nodeId, GridDhtTxFinishRequest req, boolean
committed) {
+        assert req.checkCommitted() : req;
 
-            if (req.checkCommitted()) {
-                res.checkCommitted(true);
-
-                if (committed) {
-                    if (req.needReturnValue()) {
-                        try {
-                            GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
-
-                            if (wrapper != null)
-                                res.returnValue(wrapper.fut().get());
-                            else
-                                assert !ctx.discovery().alive(nodeId) : nodeId;
-                        }
-                        catch (IgniteCheckedException ignored) {
-                            if (txFinishMsgLog.isDebugEnabled()) {
-                                txFinishMsgLog.debug("Failed to gain entry processor return
value. [txId=" + nearTxId +
-                                    ", dhtTxId=" + req.version() +
-                                    ", node=" + nodeId + ']');
-                            }
-                        }
-                    }
-                }
-                else {
-                    ClusterTopologyCheckedException cause =
-                        new ClusterTopologyCheckedException("Primary node left grid.");
+        GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+            req.partition(),
+            req.version(),
+            req.futureId(),
+            req.miniId());
 
-                    res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed
to commit transaction " +
-                        "(transaction has been rolled back on backup node): " + req.version(),
cause));
-                }
-            }
+        res.checkCommitted(true);
 
-            try {
-                ctx.io().send(nodeId, res, req.policy());
+        if (committed) {
+            if (req.needReturnValue()) {
+                try {
+                    GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
 
-                if (txFinishMsgLog.isDebugEnabled()) {
-                    txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId
+
-                        ", dhtTxId=" + req.version() +
-                        ", node=" + nodeId +
-                        ", checkCommitted=" + req.checkCommitted() + ']');
+                    if (wrapper != null)
+                        res.returnValue(wrapper.fut().get());
+                    else
+                        assert !ctx.discovery().alive(nodeId) : nodeId;
                 }
-            }
-            catch (Throwable e) {
-                // Double-check.
-                if (ctx.discovery().node(nodeId) == null) {
+                catch (IgniteCheckedException ignored) {
                     if (txFinishMsgLog.isDebugEnabled()) {
-                        txFinishMsgLog.debug("Node left while send dht tx finish response
[txId=" + nearTxId +
+                        txFinishMsgLog.debug("Failed to get return value. [txId=null" +
                             ", dhtTxId=" + req.version() +
                             ", node=" + nodeId + ']');
                     }
                 }
-                else {
-                    U.error(log, "Failed to send finish response to node [txId=" + nearTxId
+
-                        ", dhtTxId=" + req.version() +
-                        ", nodeId=" + nodeId +
-                        ", res=" + res + ']', e);
-                }
+            }
+        }
+        else {
+            ClusterTopologyCheckedException cause =
+                new ClusterTopologyCheckedException("Primary node left grid.");
 
-                if (e instanceof Error)
-                    throw (Error)e;
+            res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit
transaction " +
+                "(transaction has been rolled back on backup node): " + req.version(), cause));
+        }
+
+        sendFinishResponse(nodeId, res, req, null);
+    }
+
+    /**
+     * Sends tx finish response to remote node, if response is requested.
+     *
+     * @param nodeId Node id that originated finish request.
+     * @param req Request.
+     * @param nearTxId Near tx version.
+     */
+    private void sendFinishReply(UUID nodeId, GridDhtTxFinishRequest req, GridCacheVersion
nearTxId) {
+        assert !req.checkCommitted();
+
+        if (req.replyRequired()) {
+            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId());
+
+            if (req.dhtReplyNear()) {
+                nodeId = req.nearNodeId();
+
+                res.nearNodeResponse(true);
             }
+
+            sendFinishResponse(nodeId, res, req, nearTxId);
         }
         else {
             if (txFinishMsgLog.isDebugEnabled()) {
@@ -1437,6 +1434,47 @@ public class IgniteTxHandler {
 
     /**
      * @param nodeId Node ID.
+     * @param res Response.
+     * @param req Request (for debug info logging).
+     * @param nearTxId  (for debug info logging).
+     */
+    private void sendFinishResponse(UUID nodeId,
+        GridDistributedTxFinishResponse res,
+        GridDhtTxFinishRequest req,
+        @Nullable GridCacheVersion nearTxId) {
+        try {
+            ctx.io().send(nodeId, res, req.policy());
+
+            if (txFinishMsgLog.isDebugEnabled()) {
+                txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId +
+                    ", dhtTxId=" + req.version() +
+                    ", node=" + nodeId +
+                    ", checkCommitted=" + req.checkCommitted() + ']');
+            }
+        }
+        catch (Throwable e) {
+            // Double-check.
+            if (ctx.discovery().node(nodeId) == null) {
+                if (txFinishMsgLog.isDebugEnabled()) {
+                    txFinishMsgLog.debug("Node left while send dht tx finish response [txId="
+ nearTxId +
+                        ", dhtTxId=" + req.version() +
+                        ", node=" + nodeId + ']');
+                }
+            }
+            else {
+                U.error(log, "Failed to send finish response to node [txId=" + nearTxId +
+                    ", dhtTxId=" + req.version() +
+                    ", nodeId=" + nodeId +
+                    ", res=" + res + ']', e);
+            }
+
+            if (e instanceof Error)
+                throw (Error)e;
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
      * @param req Request.
      * @param res Response or {@code null} if should not reply to primary.
      * @return Remote transaction.


Mime
View raw message