ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4768
Date Thu, 09 Mar 2017 15:11:44 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4768-1 [created] 4b4409189


ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b440918
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b440918
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b440918

Branch: refs/heads/ignite-4768-1
Commit: 4b440918922bbc4c6b7f66b49a7352dd2e16633a
Parents: 35dad8f
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 9 15:39:04 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Mar 9 18:11:35 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheIoManager.java    |   3 +
 .../GridDistributedTxPrepareRequest.java        |  13 +-
 .../GridDistributedTxPrepareResponse.java       |   2 -
 .../dht/GridDhtTransactionalCacheAdapter.java   |   1 +
 .../cache/distributed/dht/GridDhtTxMapping.java |   9 +-
 .../dht/GridDhtTxNearPrepareResponse.java       | 232 +++++++++++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 167 ++++++++-----
 .../dht/GridDhtTxPrepareRequest.java            |  97 ++++++--
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +
 ...arOptimisticSerializableTxPrepareFuture.java |   8 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   9 +-
 .../GridNearPessimisticTxPrepareFuture.java     | 193 +++++++++++----
 .../near/GridNearTxPrepareFutureAdapter.java    |  17 +-
 .../near/GridNearTxPrepareRequest.java          |   4 +-
 .../near/GridNearTxPrepareResponse.java         |   1 -
 .../GridNearTxPrimaryPrepareCheckRequest.java   |  84 +++++++
 .../cache/transactions/IgniteInternalTx.java    |   5 +
 .../cache/transactions/IgniteTxAdapter.java     |  22 +-
 .../cache/transactions/IgniteTxHandler.java     | 104 +++++++--
 20 files changed, 819 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..df3d7e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
@@ -173,6 +174,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -50:
+                msg = new GridDhtTxNearPrepareResponse();
+
+                break;
+
             case -44:
                 msg = new TcpCommunicationSpi.HandshakeMessage2();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 50f58cc..7cac367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,6 +367,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
+//            if (!cacheMsg.partitionExchangeMessage())
+//                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e30c456..b0d1ba7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -75,7 +75,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     private static final int SYSTEM_TX_FLAG_MASK = 0x10;
 
     /** */
-    private static final int MAPPING_KNOWN_FLAG_MASK = 0x20;
+    private static final int DHT_REPLY_NEAR_FLAG_MASK = 0x20;
 
     /** Collection to message converter. */
     private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
@@ -177,7 +177,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         @Nullable Collection<IgniteTxEntry> reads,
         Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes,
-        boolean mappingKnown,
+        boolean dhtReplyNear,
         boolean retVal,
         boolean last,
         boolean onePhaseCommit,
@@ -202,11 +202,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK);
         setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK);
         setFlag(last, LAST_REQ_FLAG_MASK);
-        setFlag(mappingKnown, MAPPING_KNOWN_FLAG_MASK);
+        setFlag(dhtReplyNear, DHT_REPLY_NEAR_FLAG_MASK);
     }
 
-    public final boolean mappingKnown() {
-        return isFlag(MAPPING_KNOWN_FLAG_MASK);
+    /**
+     * @return {@code True} if transaction works in mode when DHT nodes reply directly to near node.
+     */
+    public final boolean dhtReplyNear() {
+        return isFlag(DHT_REPLY_NEAR_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 76a5e31..99f36c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -106,8 +106,6 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
         this.txState = txState;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
     /** {@inheritDoc} */
     @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
         return ctx.txPrepareMessageLogger();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dea4072..97e40b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -230,6 +230,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             if (tx == null) {
                                 tx = new GridDhtTxRemote(
                                     ctx.shared(),
+                                    false, // TODO IGNITE-4768.
                                     req.nodeId(),
                                     req.futureId(),
                                     nodeId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
index 9ec35b6..ce00352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
@@ -56,8 +56,13 @@ public class GridDhtTxMapping {
                 txNodes.put(primary.id(), backups);
             }
 
-            for (int i = 1; i < size; i++)
-                backups.add(nodes.get(i).id());
+            for (int i = 1; i < size; i++) {
+                ClusterNode backup = nodes.get(i);
+
+                assert !primary.equals(backup) : primary;
+
+                backups.add(backup.id());
+            }
         }
         else
             txNodes.put(primary.id(), new GridLeanSet<UUID>());

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
new file mode 100644
index 0000000..e582bd2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridDhtTxNearPrepareResponse extends GridCacheMessage implements IgniteTxStateAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int partId;
+
+    /** */
+    private GridCacheVersion nearTxId;
+
+    /** Future ID.  */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private int miniId;
+
+    /** Transient TX state. */
+    @GridDirectTransient
+    private IgniteTxState txState;
+
+    /**
+     *
+     */
+    public GridDhtTxNearPrepareResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param partId Partition ID.
+     * @param nearTxId Near transaction ID.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     */
+    public GridDhtTxNearPrepareResponse(int partId, GridCacheVersion nearTxId, IgniteUuid futId, int miniId) {
+        assert nearTxId != null;
+        assert futId != null;
+        assert miniId > 0;
+
+        this.partId = partId;
+        this.nearTxId = nearTxId;
+        this.futId = futId;
+        this.miniId = miniId;
+    }
+
+    /**
+     * @return Near transaction ID.
+     */
+    public GridCacheVersion nearTxId() {
+        return nearTxId;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public int miniId() {
+        return miniId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteTxState txState() {
+        return txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void txState(IgniteTxState txState) {
+        this.txState = txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.txPrepareMessageLogger();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -50;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 7;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeInt("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("nearTxId", nearTxId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                miniId = reader.readInt("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                nearTxId = reader.readMessage("nearTxId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtTxNearPrepareResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxNearPrepareResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 053c3b2..7f0cddd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -778,6 +777,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             return true;
         }
         else {
+            if (tx.dhtReplyNear())
+                return onComplete(null);
+
             if (REPLIED_UPD.compareAndSet(this, 0, 1)) {
                 GridNearTxPrepareResponse res = createPrepareResponse(this.err);
 
@@ -1216,25 +1218,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             // We are holding transaction-level locks for entries here, so we can get next write version.
             tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
 
-            {
-                // Assign keys to primary nodes.
-                if (!F.isEmpty(writes)) {
-                    for (IgniteTxEntry write : writes)
-                        map(tx.entry(write.txKey()));
-                }
+            assert tx.transactionNodes() != null;
 
-                if (!F.isEmpty(reads)) {
-                    for (IgniteTxEntry read : reads)
-                        map(tx.entry(read.txKey()));
-                }
+            final boolean dhtReplyNear = tx.dhtReplyNear();
+
+            Collection<UUID> backupNodes;
+            IgniteUuid nearFutId;
+
+            if (dhtReplyNear) {
+                backupNodes = tx.transactionNodes().get(cctx.localNodeId());
+                nearFutId = tx.colocated() ? tx.xid() : tx.nearFutureId();
+            }
+            else {
+                backupNodes = null;
+                nearFutId = null;
+            }
+
+            // Assign keys to primary nodes.
+            if (!F.isEmpty(writes)) {
+                for (IgniteTxEntry write : writes)
+                    map(tx.entry(write.txKey()), backupNodes);
+            }
+
+            if (!F.isEmpty(reads)) {
+                for (IgniteTxEntry read : reads)
+                    map(tx.entry(read.txKey()), backupNodes);
             }
 
             if (isDone())
                 return;
 
             if (last) {
-                assert tx.transactionNodes() != null;
-
                 final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
                 // Create mini futures.
@@ -1257,15 +1271,21 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     if (tx.remainingTime() == -1)
                         return;
 
-                    MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+                    MiniFuture fut = null;
 
-                    add(fut); // Append new future.
+                    if (!tx.dhtReplyNear()) {
+                        fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+
+                        add(fut); // Append new future.
+                    }
 
                     assert txNodes != null;
 
                     GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
                         futId,
-                        fut.futureId(),
+                        fut != null ? fut.futureId() : null,
+                        nearFutId,
+                        nearMiniId,
                         tx.topologyVersion(),
                         tx,
                         timeout,
@@ -1273,8 +1293,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         nearWrites,
                         txNodes,
                         tx.nearXidVersion(),
-                        false,
-                        true,
+                        tx.dhtReplyNear(),
+                        /*last*/true,
                         tx.onePhaseCommit(),
                         tx.subjectId(),
                         tx.taskNameHash(),
@@ -1344,7 +1364,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         }
                     }
                     catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
+                        if (fut != null)
+                            fut.onNodeLeft();
                     }
                     catch (IgniteCheckedException e) {
                         if (!cctx.kernalContext().isStopping()) {
@@ -1354,7 +1375,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                     ", node=" + n.id() + ']');
                             }
 
-                            fut.onResult(e);
+                            // TODO IGNITE-4768, reply on near with error.
+                            if (fut != null)
+                                fut.onResult(e);
                         }
                         else {
                             if (msgLog.isDebugEnabled()) {
@@ -1368,6 +1391,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
 
                 for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                    assert !tx.dhtReplyNear();
+
                     if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
                         if (tx.remainingTime() == -1)
                             return;
@@ -1379,6 +1404,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
                             futId,
                             fut.futureId(),
+                            nearFutId,
+                            nearMiniId,
                             tx.topologyVersion(),
                             tx,
                             timeout,
@@ -1459,49 +1486,73 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * @param entry Entry.
+     */
+    private void onPrepare(IgniteTxEntry entry) {
+        if (entry.op() == READ || entry.op() == NOOP) {
+            GridCacheContext cacheCtx = entry.context();
+
+            ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
+
+            if (expiry != null) {
+                entry.op(NOOP);
+
+                entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+            }
+        }
+    }
+
+    /**
      * @param entry Transaction entry.
      */
-    private void map(IgniteTxEntry entry) {
+    private void map(IgniteTxEntry entry, @Nullable Collection<UUID> backupNodes) {
         if (entry.cached().isLocal())
             return;
 
-        GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+        onPrepare(entry);
 
         GridCacheContext cacheCtx = entry.context();
 
         GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
-        ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
-
-        if (expiry != null && (entry.op() == READ || entry.op() == NOOP)) {
-            entry.op(NOOP);
-
-            entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
-        }
+        GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
         while (true) {
             try {
                 List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
 
+                assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+
                 if (log.isDebugEnabled())
                     log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
                         ", entry=" + entry + ']');
 
-                // Exclude local node.
-                map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+                for (int i = 1; i < dhtNodes.size(); i++) {
+                    ClusterNode node = dhtNodes.get(i);
+
+                    if (backupNodes != null  && !backupNodes.contains(node.id()))
+                        continue;
+
+                    addMapping(entry, node, dhtMap);
+                }
 
                 Collection<UUID> readers = cached.readers();
 
                 if (!F.isEmpty(readers)) {
-                    Collection<ClusterNode> nearNodes =
-                        cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+                    for (UUID readerId : readers) {
+                        if (readerId.equals(tx.nearNodeId()))
+                            continue;
 
-                    if (log.isDebugEnabled())
-                        log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
-                            ", entry=" + entry + ']');
+                        ClusterNode readerNode = cctx.discovery().node(readerId);
+
+                        if (readerNode == null || dhtNodes.contains(readerNode))
+                            continue;
 
-                    // Exclude DHT nodes.
-                    map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+                        if (log.isDebugEnabled())
+                            log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
+
+                        addMapping(entry, readerNode, nearMap);
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Entry has no near readers: " + entry);
@@ -1518,39 +1569,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param entry Entry.
-     * @param nodes Nodes.
+     * @param n Node.
      * @param globalMap Map.
      */
-    private void map(
+    private void addMapping(
         IgniteTxEntry entry,
-        Iterable<ClusterNode> nodes,
+        ClusterNode n,
         Map<UUID, GridDistributedTxMapping> globalMap
     ) {
-        if (nodes != null) {
-            for (ClusterNode n : nodes) {
-                GridDistributedTxMapping global = globalMap.get(n.id());
+        GridDistributedTxMapping global = globalMap.get(n.id());
 
-                if (!F.isEmpty(entry.entryProcessors())) {
-                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
-                        entry.cached().partition());
+        if (!F.isEmpty(entry.entryProcessors())) {
+            GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                entry.cached().partition());
 
-                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
-                        T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+            if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
 
-                        assert procVal != null : entry;
+                assert procVal != null : entry;
 
-                        entry.op(procVal.get1());
-                        entry.value(procVal.get2(), true, false);
-                        entry.entryProcessors(null);
-                    }
-                }
-
-                if (global == null)
-                    globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
-                global.add(entry);
+                entry.op(procVal.get1());
+                entry.value(procVal.get2(), true, false);
+                entry.entryProcessors(null);
             }
         }
+
+        if (global == null)
+            globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+        global.add(entry);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index e55d189..fe2d293 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -61,6 +61,12 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Mini future ID. */
     private IgniteUuid miniId;
 
+    /** Future ID. */
+    private IgniteUuid nearFutId;
+
+    /** Mini future ID. */
+    private int nearMiniId;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -107,6 +113,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /**
      * @param futId Future ID.
      * @param miniId Mini future ID.
+     * @param nearFutId Near node future ID.
+     * @param nearMiniId Near node mini future ID.
      * @param topVer Topology version.
      * @param tx Transaction.
      * @param timeout Transaction timeout.
@@ -121,6 +129,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
         IgniteUuid miniId,
+        IgniteUuid nearFutId,
+        int nearMiniId,
         AffinityTopologyVersion topVer,
         GridDhtTxLocalAdapter tx,
         long timeout,
@@ -128,7 +138,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         Collection<IgniteTxEntry> nearWrites,
         Map<UUID, Collection<UUID>> txNodes,
         GridCacheVersion nearXidVer,
-        boolean mappingKnown,
+        boolean dhtNearReply,
         boolean last,
         boolean onePhaseCommit,
         UUID subjId,
@@ -140,14 +150,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             null,
             dhtWrites,
             txNodes,
-            mappingKnown,
+            dhtNearReply,
             retVal,
             last,
             onePhaseCommit,
             addDepInfo);
 
-        assert futId != null;
-        assert miniId != null;
+        assert dhtNearReply || (futId != null && miniId != null);
+        assert !dhtNearReply || (nearFutId != null && nearMiniId != 0);
 
         this.topVer = topVer;
         this.futId = futId;
@@ -156,6 +166,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.nearXidVer = nearXidVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.nearFutId = nearFutId;
+        this.nearMiniId = nearMiniId;
 
         needReturnValue(retVal);
 
@@ -252,6 +264,21 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     }
 
     /**
+     * @return Near future ID.
+     */
+    public IgniteUuid nearFutureId() {
+        return nearFutId;
+    }
+
+    /**
+     * @return Near mini future ID.
+     */
+    public int nearMiniId() {
+        return nearMiniId;
+    }
+
+
+    /**
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
@@ -369,54 +396,66 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeIgniteUuid("nearFutId", nearFutId))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+                if (!writer.writeInt("nearMiniId", nearMiniId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeBitSet("preloadKeys", preloadKeys))
+                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
             case 31:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 32:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 33:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -463,7 +502,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 23:
-                nearNodeId = reader.readUuid("nearNodeId");
+                nearFutId = reader.readIgniteUuid("nearFutId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -471,7 +510,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+                nearMiniId = reader.readInt("nearMiniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -479,7 +518,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -487,7 +526,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -495,7 +534,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -503,7 +542,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                preloadKeys = reader.readBitSet("preloadKeys");
+                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -511,7 +550,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                subjId = reader.readUuid("subjId");
+                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -519,7 +558,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                taskNameHash = reader.readInt("taskNameHash");
+                preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -527,6 +566,22 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 32:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 33:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -546,6 +601,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 32;
+        return 34;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f3c4963..97725bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -92,6 +92,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      */
     public GridDhtTxRemote(
         GridCacheSharedContext ctx,
+        boolean dhtReplyNear,
         UUID nearNodeId,
         IgniteUuid rmtFutId,
         UUID nodeId,
@@ -143,6 +144,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         assert topVer != null && topVer.topologyVersion() > 0 : topVer;
 
         topologyVersion(topVer);
+
+        dhtReplyNear(dhtReplyNear);
     }
 
     /**
@@ -165,6 +168,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      */
     public GridDhtTxRemote(
         GridCacheSharedContext ctx,
+        boolean dhtReplyNear,
         UUID nearNodeId,
         IgniteUuid rmtFutId,
         UUID nodeId,
@@ -212,6 +216,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         assert topVer != null && topVer.topologyVersion() > 0 : topVer;
 
         topologyVersion(topVer);
+
+        dhtReplyNear(dhtReplyNear);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index bb1609d..fcd714b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -201,6 +202,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /** {@inheritDoc} */
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+        assert false; // TODO IGNITE-4768.
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
         if (isDone())
             return false;
@@ -878,7 +884,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrimaryPrepareResponse(m, res);
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8bb79c1..9a7f500 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -58,7 +59,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
@@ -202,6 +202,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+        assert false; // TODO IGNITE-4768.
+    }
+
     /**
      * @return Keys for which MiniFuture isn't completed.
      */
@@ -913,7 +918,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                             remap();
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrimaryPrepareResponse(m, res);
 
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index fb2c2fd..ed3f2f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -29,11 +31,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -72,8 +76,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        boolean found = false;
-
         for (IgniteInternalFuture<?> fut : futures()) {
             MiniFuture f = (MiniFuture)fut;
 
@@ -83,13 +85,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
                 e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                f.onNodeLeft(e);
-
-                found = true;
+                f.onPrimaryLeft(e);
             }
+            else
+                f.checkDhtFailed(nodeId);
         }
 
-        return found;
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -102,7 +104,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (f != null) {
                 assert f.primary().id().equals(nodeId);
 
-                f.onResult(res);
+                f.onPrimaryResult(res);
             }
             else {
                 if (msgLog.isDebugEnabled()) {
@@ -123,6 +125,22 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+        MiniFuture f = miniFuture(res.miniId());
+
+        if (f != null)
+            f.onDhtResponse(nodeId, res);
+        else {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Near pessimistic prepare, failed to find mini future [txId=" + tx.nearXidVersion() +
+                    ", node=" + nodeId +
+                    ", res=" + res +
+                    ", fut=" + this + ']');
+            }
+        }
+    }
+
     /**
      * Finds pending mini future by the given mini ID.
      *
@@ -184,7 +202,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      *
      */
     private void preparePessimistic() {
-        boolean mappingKnown = true;
+        // TODO IGNITE-4768: need detect on lock step?
+
+        boolean dhtReplyNear = true;
 
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
 
@@ -202,10 +222,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (!cacheCtx.isLocal()) {
                 GridDhtPartitionTopology top = cacheCtx.topology();
 
-                if (mappingKnown && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer)))
-                    mappingKnown = false;
+                if (dhtReplyNear && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer)))
+                    dhtReplyNear = false;
 
                 nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+
+                if (nodes.size() == 1)
+                    dhtReplyNear = false;
             }
             else
                 nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
@@ -237,8 +260,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         checkOnePhase(txMapping);
 
-        if (mappingKnown && tx.onePhaseCommit())
-            mappingKnown = false;
+        // TODO IGNITE-4768.
+        if (dhtReplyNear && tx.onePhaseCommit())
+            dhtReplyNear = false;
+
+        tx.dhtReplyNear(dhtReplyNear);
 
         long timeout = tx.remainingTime();
 
@@ -262,7 +288,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 m.writes(),
                 m.near(),
                 txMapping.transactionNodes(),
-                mappingKnown,
+                dhtReplyNear,
                 true,
                 tx.onePhaseCommit(),
                 tx.needReturnValue() && tx.implicit(),
@@ -278,7 +304,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     req.addDhtVersion(txEntry.txKey(), null);
             }
 
-            final MiniFuture fut = new MiniFuture(m, ++miniId);
+            final MiniFuture fut = new MiniFuture(m, ++miniId, req);
 
             req.miniId(fut.futureId());
 
@@ -292,7 +318,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
                     @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
                         try {
-                            fut.onResult(prepFut.get());
+                            fut.onPrimaryResult(prepFut.get());
                         }
                         catch (IgniteCheckedException e) {
                             fut.onError(e);
@@ -301,33 +327,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 });
             }
             else {
-                try {
-                    cctx.io().send(primary, req, tx.ioPolicy());
-
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() +
-                            ", node=" + primary.id() + ']');
-                    }
-                }
-                catch (ClusterTopologyCheckedException e) {
-                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer));
+                if (!sendPrimaryRequest(primary, fut, req))
+                    break;
+            }
+        }
 
-                    fut.onNodeLeft(e);
-                }
-                catch (IgniteCheckedException e) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() +
-                            ", node=" + primary.id() + ", err=" + e + ']');
-                    }
+        markInitialized();
+    }
 
-                    fut.onError(e);
+    /**
+     * @param primary Primary node.
+     * @param fut Future.
+     * @param req Request.
+     * @return {@code False} if failed to send request.
+     */
+    private boolean sendPrimaryRequest(ClusterNode primary,  MiniFuture fut, GridCacheMessage req) {
+        try {
+            cctx.io().send(primary, req, tx.ioPolicy());
 
-                    break;
-                }
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() +
+                    ", node=" + primary.id() + ']');
             }
+
+            return true;
         }
+        catch (ClusterTopologyCheckedException e) {
+            e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-        markInitialized();
+            fut.onPrimaryLeft(e);
+
+            return false;
+        }
+        catch (IgniteCheckedException e) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() +
+                    ", node=" + primary.id() + ", err=" + e + ']');
+            }
+
+            fut.onError(e);
+
+            return false;
+        }
     }
 
     /** {@inheritDoc} */
@@ -372,7 +413,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+    private class MiniFuture extends GridFutureAdapter {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -382,13 +423,28 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         /** */
         private GridDistributedTxMapping m;
 
+        /** */
+        private final Set<UUID> dhtNodes;
+
+        /** */
+        private boolean primaryProcessed;
+
         /**
          * @param m Mapping.
          * @param futId Mini future ID.
+         * @param req Request.
          */
-        MiniFuture(GridDistributedTxMapping m, int futId) {
+        MiniFuture(GridDistributedTxMapping m, int futId, GridNearTxPrepareRequest req) {
             this.m = m;
             this.futId = futId;
+
+            if (req.dhtReplyNear()) {
+                dhtNodes = new HashSet<>(req.transactionNodes().get(m.primary().id()));
+
+                assert !F.isEmpty(dhtNodes) : dhtNodes;
+            }
+            else
+                dhtNodes = null;
         }
 
         /**
@@ -408,20 +464,71 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         /**
          * @param res Response.
          */
-        void onResult(GridNearTxPrepareResponse res) {
+        void onPrimaryResult(GridNearTxPrepareResponse res) {
             if (res.error() != null)
                 onError(res.error());
             else {
-                onPrepareResponse(m, res);
+                assert dhtNodes == null;
+
+                onPrimaryPrepareResponse(m, res);
 
-                onDone(res);
+                onDone();
             }
         }
 
         /**
+         * @param failedNodeId Failed node ID.
+         */
+        void checkDhtFailed(UUID failedNodeId) {
+            if (dhtNodes == null)
+                return;
+
+            boolean done = false;
+            GridNearTxPrimaryPrepareCheckRequest checkReq = null;
+
+            synchronized (dhtNodes) {
+                if (dhtNodes.remove(failedNodeId) && dhtNodes.isEmpty()) {
+                    if (primaryProcessed)
+                        done = true;
+                    else
+                        checkReq = new GridNearTxPrimaryPrepareCheckRequest();
+                }
+            }
+
+            if (checkReq != null) {
+                if (cctx.localNodeId().equals(primary().id())) {
+                    // TODO IGNITE-4768.
+                }
+                else
+                    sendPrimaryRequest(primary(), this, checkReq);
+            }
+            else if (done)
+                onDone();
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         */
+        void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+            assert dhtNodes != null;
+
+            boolean done;
+
+            synchronized (dhtNodes) {
+                primaryProcessed = true;
+
+                done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+            }
+
+            if (done)
+                onDone();
+        }
+
+        /**
          * @param e Error.
          */
-        void onNodeLeft(ClusterTopologyCheckedException e) {
+        void onPrimaryLeft(ClusterTopologyCheckedException e) {
             if (msgLog.isDebugEnabled()) {
                 msgLog.debug("Near pessimistic prepare, mini future node left [txId=" + tx.nearXidVersion() +
                     ", nodeId=" + m.primary().id() + ']');
@@ -431,7 +538,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 tx.markForBackupCheck();
 
                 // Do not fail future for one-phase transaction right away.
-                onDone((GridNearTxPrepareResponse)null);
+                onDone();
             }
 
             onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index a0f28c5..7f94e9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -155,6 +156,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends
     public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
 
     /**
+     * @param nodeId Sender.
+     * @param res Response.
+     */
+    public abstract void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res);
+
+    /**
      * Checks if mapped transaction can be committed on one phase.
      * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
      *
@@ -182,8 +189,16 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param m Mapping.
      * @param res Response.
      */
+    final void onDhtPrepareResponse(GridDistributedTxMapping m, GridDhtTxNearPrepareResponse res) {
+
+    }
+
+    /**
+     * @param m Mapping.
+     * @param res Response.
+     */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+    final void onPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
         if (res == null)
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 3eff9e5..eb124bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -107,7 +107,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         Collection<IgniteTxEntry> writes,
         boolean near,
         Map<UUID, Collection<UUID>> txNodes,
-        boolean mappingKnown,
+        boolean dhtReplyNear,
         boolean last,
         boolean onePhaseCommit,
         boolean retVal,
@@ -123,7 +123,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
             reads,
             writes,
             txNodes,
-            mappingKnown,
+            dhtReplyNear,
             retVal,
             last,
             onePhaseCommit,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 9dad722..08b071d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -517,5 +517,4 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public String toString() {
         return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrimaryPrepareCheckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrimaryPrepareCheckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrimaryPrepareCheckRequest.java
new file mode 100644
index 0000000..9c366fd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrimaryPrepareCheckRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearTxPrimaryPrepareCheckRequest extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int partId;
+
+    /** Future ID.  */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private int miniId;
+
+    /**
+     *
+     */
+    public GridNearTxPrimaryPrepareCheckRequest() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        return super.writeTo(buf, writer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        return super.readFrom(buf, reader);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearTxPrimaryPrepareCheckRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..be26159 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -683,6 +683,11 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean onePhaseCommit();
 
     /**
+     * @return {@code True} if transaction works in mode when DHT nodes reply directly to near node.
+     */
+    public boolean dhtReplyNear();
+
+    /**
      * @return {@code True} if transaction has transform entries. This flag will be only set for local
      *      transactions.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b07a117..9ded2a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -193,6 +193,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected boolean onePhaseCommit;
 
     /** */
+    protected boolean dhtReplyNear;
+
+    /** */
     protected CacheWriteSynchronizationMode syncMode;
 
     /** If this transaction contains transform entries. */
@@ -665,7 +668,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
     /** {@inheritDoc} */
     @Override public Map<Integer, Set<Integer>> invalidPartitions() {
-        return invalidParts == null ? Collections.<Integer, Set<Integer>>emptyMap() : invalidParts;
+        return invalidParts;
     }
 
     /** {@inheritDoc} */
@@ -979,6 +982,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         return onePhaseCommit;
     }
 
+    /**
+     * @param dhtReplyNear {@code True} if transaction works in mode when DHT nodes reply directly to near node.
+     */
+    public void dhtReplyNear(boolean dhtReplyNear) {
+        this.dhtReplyNear = dhtReplyNear;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean dhtReplyNear() {
+        return dhtReplyNear;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean optimistic() {
         return concurrency == OPTIMISTIC;
@@ -2393,6 +2408,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Override public boolean dhtReplyNear() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean hasTransforms() {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b440918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f3f67a2..07777e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -151,6 +152,12 @@ public class IgniteTxHandler {
             }
         });
 
+        ctx.io().addHandler(0, GridDhtTxNearPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
+                processDhtTxNearPrepareResponse(nodeId, (GridDhtTxNearPrepareResponse)msg);
+            }
+        });
+
         ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
@@ -467,6 +474,8 @@ public class IgniteTxHandler {
 
             tx.transactionNodes(req.transactionNodes());
 
+            tx.dhtReplyNear(req.dhtReplyNear());
+
             if (req.near())
                 tx.nearOnOriginatingNode(true);
 
@@ -556,6 +565,34 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param res Response.
      */
+    private void processDhtTxNearPrepareResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+        if (txPrepareMsgLog.isDebugEnabled())
+            txPrepareMsgLog.debug("Received dht near prepare response [txId=" + res.nearTxId() + ", node=" + nodeId + ']');
+
+        GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
+            .<IgniteInternalTx>mvccFuture(res.nearTxId(), res.futureId());
+
+        if (fut == null) {
+            U.warn(log, "Failed to find future for dht near prepare response [txId=" + res.nearTxId() +
+                ", node=" + nodeId +
+                ", res=" + res + ']');
+
+            return;
+        }
+
+        IgniteInternalTx tx = fut.tx();
+
+        assert tx != null;
+
+        res.txState(tx.txState());
+
+        fut.onDhtResponse(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
     private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
         if (txPrepareMsgLog.isDebugEnabled())
             txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" + nodeId + ']');
@@ -937,7 +974,7 @@ public class IgniteTxHandler {
      * @param nodeId Sender node ID.
      * @param req Request.
      */
-    protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
+    private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
                 ", dhtTxId=" + req.version() +
@@ -952,17 +989,30 @@ public class IgniteTxHandler {
         GridDhtTxRemote dhtTx = null;
         GridNearTxRemote nearTx = null;
 
-        GridDhtTxPrepareResponse res;
+        GridDhtTxPrepareResponse res = null;
+        GridDhtTxNearPrepareResponse nearRes = null;
 
         try {
-            res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
+            if (req.dhtReplyNear()) {
+                nearRes = new GridDhtTxNearPrepareResponse(
+                    req.partition(),
+                    req.nearXidVersion(),
+                    req.nearFutureId(),
+                    req.nearMiniId());
+            }
+            else {
+                res = new GridDhtTxPrepareResponse(req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.deployInfo() != null);
+            }
 
             // Start near transaction first.
             nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
             dhtTx = startRemoteTx(nodeId, req, res);
 
             // Set evicted keys from near transaction.
-            if (nearTx != null)
+            if (nearTx != null) // TODO IGNITE-4768 support near cache.
                 res.nearEvicted(nearTx.evicted());
 
             if (dhtTx != null)
@@ -970,8 +1020,12 @@ public class IgniteTxHandler {
             else if (nearTx != null)
                 req.txState(nearTx.txState());
 
-            if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
-                res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
+            if (dhtTx != null) {
+                if (res != null && dhtTx.invalidPartitions() != null)
+                    res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
+                else
+                    assert F.isEmpty(dhtTx.invalidPartitions()) : dhtTx.invalidPartitions();
+            }
 
             if (req.onePhaseCommit()) {
                 assert req.last();
@@ -1040,15 +1094,20 @@ public class IgniteTxHandler {
 
                 completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                     @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
-                        sendReply(nodeId, req, res0, dhtTx0, nearTx0);
+                        sendPrepareReply(nodeId, req, res0, dhtTx0, nearTx0);
                     }
                 });
             }
             else
-                sendReply(nodeId, req, res, dhtTx, nearTx);
+                sendPrepareReply(nodeId, req, res, dhtTx, nearTx);
+        }
+        else {
+            if (res != null)
+                sendPrepareReply(nodeId, req, res, dhtTx, nearTx);
+
+            if (nearRes != null)
+                sendPrepareReply(req.nearNodeId(), req, nearRes, dhtTx, nearTx);
         }
-        else
-            sendReply(nodeId, req, res, dhtTx, nearTx);
 
         assert req.txState() != null || res.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
@@ -1058,7 +1117,7 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+    private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
         final GridDhtTxOnePhaseCommitAckRequest req) {
         assert nodeId != null;
         assert req != null;
@@ -1075,7 +1134,7 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     @SuppressWarnings({"unchecked"})
-    protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
+    private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
         assert nodeId != null;
         assert req != null;
 
@@ -1271,9 +1330,9 @@ public class IgniteTxHandler {
      * @param dhtTx Dht tx.
      * @param nearTx Near tx.
      */
-    protected void sendReply(UUID nodeId,
+    private void sendPrepareReply(UUID nodeId,
         GridDhtTxPrepareRequest req,
-        GridDhtTxPrepareResponse res,
+        GridCacheMessage res,
         GridDhtTxRemote dhtTx,
         GridNearTxRemote nearTx) {
         try {
@@ -1318,7 +1377,7 @@ public class IgniteTxHandler {
      * @param committed {@code True} if transaction committed on this node.
      * @param nearTxId Near tx version.
      */
-    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
+    private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
         if (req.replyRequired() || req.checkCommitted()) {
             GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
 
@@ -1399,10 +1458,10 @@ public class IgniteTxHandler {
      * @return Remote transaction.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable GridDhtTxRemote startRemoteTx(
+    @Nullable private GridDhtTxRemote startRemoteTx(
         UUID nodeId,
         GridDhtTxPrepareRequest req,
-        GridDhtTxPrepareResponse res
+        @Nullable GridDhtTxPrepareResponse res
     ) throws IgniteCheckedException {
         if (!F.isEmpty(req.writes())) {
             GridDhtTxRemote tx = ctx.tm().tx(req.version());
@@ -1412,6 +1471,7 @@ public class IgniteTxHandler {
 
                 tx = new GridDhtTxRemote(
                     ctx,
+                    req.dhtReplyNear(),
                     req.nearNodeId(),
                     req.futureId(),
                     nodeId,
@@ -1469,7 +1529,7 @@ public class IgniteTxHandler {
                         invalidateNearEntry(cacheCtx, entry.key(), req.version());
 
                     try {
-                        if (req.needPreloadKey(idx)) {
+                        if (res != null && req.needPreloadKey(idx)) {
                             GridCacheEntryEx cached = entry.cached();
 
                             if (cached == null)
@@ -1547,8 +1607,6 @@ public class IgniteTxHandler {
                 tx.state(PREPARED);
             }
 
-            res.invalidPartitionsByCacheId(tx.invalidPartitions());
-
             if (tx.empty() && req.last()) {
                 tx.rollback();
 
@@ -1586,7 +1644,7 @@ public class IgniteTxHandler {
      * @return Remote transaction.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public GridNearTxRemote startNearRemoteTx(ClassLoader ldr, UUID nodeId,
+    @Nullable private GridNearTxRemote startNearRemoteTx(ClassLoader ldr, UUID nodeId,
         GridDhtTxPrepareRequest req) throws IgniteCheckedException {
 
         if (!F.isEmpty(req.nearWrites())) {
@@ -1644,7 +1702,7 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void processCheckPreparedTxRequest(final UUID nodeId,
+    private void processCheckPreparedTxRequest(final UUID nodeId,
         final GridCacheTxRecoveryRequest req) {
         if (txRecoveryMsgLog.isDebugEnabled()) {
             txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
@@ -1729,7 +1787,7 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param res Response.
      */
-    protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
+    private void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
         if (txRecoveryMsgLog.isDebugEnabled()) {
             txRecoveryMsgLog.debug("Received tx recovery response [txId=" + res.version() +
                 ", node=" + nodeId +


Mime
View raw message