ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: disco cache cleanup
Date Thu, 31 Aug 2017 12:11:54 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478-1 b7b9089f0 -> 01b2ce155


disco cache cleanup


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01b2ce15
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01b2ce15
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01b2ce15

Branch: refs/heads/ignite-3478-1
Commit: 01b2ce1550697ef90d3ba44c1d95dd774ce9eaa9
Parents: b7b9089
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Aug 31 15:11:46 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Aug 31 15:11:46 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtTxLocal.java   |  48 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 440 +++++++++----------
 .../cache/distributed/near/GridNearTxLocal.java |  20 +-
 .../cache/transactions/IgniteTxHandler.java     |  15 +-
 4 files changed, 242 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01b2ce15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 5b8a7b5..ab5631e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -34,10 +34,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -313,24 +313,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param verMap Version map.
-     * @param msgId Message ID.
-     * @param nearMiniId Near mini future ID.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<IgniteTxKey, GridCacheVersion> verMap,
-        long msgId,
-        int nearMiniId,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNearTxPrepareRequest
req) {
         // In optimistic mode prepare still can be called explicitly from salvageTx.
         GridDhtTxPrepareFuture fut = prepFut;
 
@@ -344,14 +330,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
                 cctx,
                 this,
                 timeout,
-                nearMiniId,
-                verMap,
-                last,
+                req.miniId(),
+                req.dhtVersions(),
+                req.last(),
                 needReturnValue()))) {
                 GridDhtTxPrepareFuture f = prepFut;
 
-                assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future
" +
-                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut="
+ f + ']';
+                assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future
" +
+                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut="
+ f + ']';
 
                 if (timeout == -1)
                     f.onError(timeoutException());
@@ -360,8 +346,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             }
         }
         else {
-            assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future
" +
-                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" +
fut + ']';
+            assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future
" +
+                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut="
+ fut + ']';
 
             // Prepare was called explicitly.
             return chainOnePhasePrepare(fut);
@@ -389,14 +375,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements
GridCacheMa
         }
 
         try {
-            if (reads != null) {
-                for (IgniteTxEntry e : reads)
-                    addEntry(msgId, e);
+            if (req.reads() != null) {
+                for (IgniteTxEntry e : req.reads())
+                    addEntry(req.messageId(), e);
             }
 
-            if (writes != null) {
-                for (IgniteTxEntry e : writes)
-                    addEntry(msgId, e);
+            if (req.writes() != null) {
+                for (IgniteTxEntry e : req.writes())
+                    addEntry(req.messageId(), e);
             }
 
             userPrepare(null);
@@ -407,7 +393,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01b2ce15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 03d99fc..d886343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -168,14 +169,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     @SuppressWarnings("UnusedDeclaration")
     private volatile int mapped;
 
-    /** Prepare reads. */
-    private Iterable<IgniteTxEntry> reads;
-
-    /** Prepare writes. */
-    private Iterable<IgniteTxEntry> writes;
-
-    /** Tx nodes. */
-    private Map<UUID, Collection<UUID>> txNodes;
+    /** Prepare request. */
+    private GridNearTxPrepareRequest req;
 
     /** Trackable flag. */
     private boolean trackable = true;
@@ -341,7 +336,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     private void onEntriesLocked() {
         ret = new GridCacheReturn(null, tx.localResult(), true, null, true);
 
-        for (IgniteTxEntry writeEntry : writes) {
+        for (IgniteTxEntry writeEntry : req.writes()) {
             IgniteTxEntry txEntry = tx.entry(writeEntry.txKey());
 
             assert txEntry != null : writeEntry;
@@ -597,10 +592,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
         if (log.isDebugEnabled())
             log.debug("Marking all local candidates as ready: " + this);
 
-        readyLocks(writes);
+        readyLocks(req.writes());
 
         if (tx.serializable() && tx.optimistic())
-            readyLocks(reads);
+            readyLocks(req.reads());
 
         locksReady = true;
     }
@@ -896,8 +891,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      */
     private void addDhtValues(GridNearTxPrepareResponse res) {
         // Interceptor on near node needs old values to execute callbacks.
-        if (!F.isEmpty(writes)) {
-            for (IgniteTxEntry e : writes) {
+        if (!F.isEmpty(req.writes())) {
+            for (IgniteTxEntry e : req.writes()) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
                 assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key="
+ e.txKey() + ']';
@@ -1002,33 +997,30 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     /**
      * Initializes future.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
+     * @param req Prepare request.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry>
writes,
-        Map<UUID, Collection<UUID>> txNodes) {
+    public void prepare(GridNearTxPrepareRequest req) {
+        assert req != null;
+
         if (tx.empty()) {
             tx.setRollbackOnly();
 
             onDone((GridNearTxPrepareResponse)null);
         }
 
-        this.reads = reads;
-        this.writes = writes;
-        this.txNodes = txNodes;
+        this.req = req;
 
         boolean ser = tx.serializable() && tx.optimistic();
 
-        if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
+        if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
-            for (IgniteTxEntry entry : writes)
+            for (IgniteTxEntry entry : req.writes())
                 forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
 
             if (ser) {
-                for (IgniteTxEntry entry : reads)
+                for (IgniteTxEntry entry : req.reads())
                     forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
             }
 
@@ -1196,10 +1188,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 IgniteCheckedException err0;
 
                 try {
-                    err0 = checkReadConflict(writes);
+                    err0 = checkReadConflict(req.writes());
 
                     if (err0 == null)
-                        err0 = checkReadConflict(reads);
+                        err0 = checkReadConflict(req.reads());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to check entry version: " + e, e);
@@ -1231,258 +1223,262 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             // We are holding transaction-level locks for entries here, so we can get next
write version.
             tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
 
-            {
-                // Assign keys to primary nodes.
-                if (!F.isEmpty(writes)) {
-                    for (IgniteTxEntry write : writes)
-                        map(tx.entry(write.txKey()));
-                }
+            // Assign keys to primary nodes.
+            if (!F.isEmpty(req.writes())) {
+                for (IgniteTxEntry write : req.writes())
+                    map(tx.entry(write.txKey()));
+            }
 
-                if (!F.isEmpty(reads)) {
-                    for (IgniteTxEntry read : reads)
-                        map(tx.entry(read.txKey()));
-                }
+            if (!F.isEmpty(req.reads())) {
+                for (IgniteTxEntry read : req.reads())
+                    map(tx.entry(read.txKey()));
             }
 
             if (isDone())
                 return;
 
-            if (last) {
-                if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
-                    for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                        if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                            tx.onePhaseCommit(false);
+            if (last)
+                sendPrepareRequests();
+        }
+        finally {
+            markInitialized();
+        }
+    }
 
-                            break;
-                        }
-                    }
-                }
+    /**
+     *
+     */
+    private void sendPrepareRequests() {
+        if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+            for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                    tx.onePhaseCommit(false);
 
-                int miniId = 0;
+                    break;
+                }
+            }
+        }
 
-                assert tx.transactionNodes() != null;
+        int miniId = 0;
 
-                final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+        assert tx.transactionNodes() != null;
 
-                // Create mini futures.
-                for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
-                    assert !dhtMapping.empty();
+        final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
-                    ClusterNode n = dhtMapping.primary();
+        // Create mini futures.
+        for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
+            assert !dhtMapping.empty();
 
-                    assert !n.isLocal();
+            ClusterNode n = dhtMapping.primary();
 
-                    GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
+            assert !n.isLocal();
 
-                    Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null
: nearMapping.writes();
+            GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
 
-                    Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
+            Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
 
-                    if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
-                        continue;
+            Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
 
-                    if (tx.remainingTime() == -1)
-                        return;
+            if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+                continue;
 
-                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+            if (tx.remainingTime() == -1)
+                return;
 
-                    add(fut); // Append new future.
+            MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+            add(fut); // Append new future.
+
+            assert req.transactionNodes() != null;
+
+            GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                futId,
+                fut.futureId(),
+                tx.topologyVersion(),
+                tx,
+                timeout,
+                dhtWrites,
+                nearWrites,
+                this.req.transactionNodes(),
+                tx.nearXidVersion(),
+                true,
+                tx.onePhaseCommit(),
+                tx.subjectId(),
+                tx.taskNameHash(),
+                tx.activeCachesDeploymentEnabled(),
+                tx.storeWriteThrough(),
+                retVal);
+
+            int idx = 0;
+
+            for (IgniteTxEntry entry : dhtWrites) {
+                try {
+                    GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-                    assert txNodes != null;
+                    GridCacheContext<?, ?> cacheCtx = cached.context();
 
-                    GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                        futId,
-                        fut.futureId(),
-                        tx.topologyVersion(),
-                        tx,
-                        timeout,
-                        dhtWrites,
-                        nearWrites,
-                        txNodes,
-                        tx.nearXidVersion(),
-                        true,
-                        tx.onePhaseCommit(),
-                        tx.subjectId(),
-                        tx.taskNameHash(),
-                        tx.activeCachesDeploymentEnabled(),
-                        tx.storeWriteThrough(),
-                        retVal);
+                    // Do not invalidate near entry on originating transaction node.
+                    req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
+                        cached.readerId(n.id()) != null);
 
-                    int idx = 0;
+                    if (cached.isNewLocked()) {
+                        List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
+                            tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
 
-                    for (IgniteTxEntry entry : dhtWrites) {
-                        try {
-                            GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+                        // Do not preload if local node is a partition owner.
+                        if (!owners.contains(cctx.localNode()))
+                            req.markKeyForPreload(idx);
+                    }
 
-                            GridCacheContext<?, ?> cacheCtx = cached.context();
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    assert false : "Got removed exception on entry with dht local candidate:
" + entry;
+                }
 
-                            // Do not invalidate near entry on originating transaction node.
-                            req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id())
&&
-                                cached.readerId(n.id()) != null);
+                idx++;
+            }
 
-                            if (cached.isNewLocked()) {
-                                List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
-                                    tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
+            if (!F.isEmpty(nearWrites)) {
+                for (IgniteTxEntry entry : nearWrites) {
+                    try {
+                        if (entry.explicitVersion() == null) {
+                            GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                // Do not preload if local node is a partition owner.
-                                if (!owners.contains(cctx.localNode()))
-                                    req.markKeyForPreload(idx);
-                            }
+                            assert added != null : "Missing candidate for cache entry:" +
entry;
+                            assert added.dhtLocal();
 
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry with dht local
candidate: " + entry;
+                            if (added.ownerVersion() != null)
+                                req.owned(entry.txKey(), added.ownerVersion());
                         }
 
-                        idx++;
+                        break;
                     }
-
-                    if (!F.isEmpty(nearWrites)) {
-                        for (IgniteTxEntry entry : nearWrites) {
-                            try {
-                                if (entry.explicitVersion() == null) {
-                                    GridCacheMvccCandidate added = entry.cached().candidate(version());
-
-                                    assert added != null : "Missing candidate for cache entry:"
+ entry;
-                                    assert added.dhtLocal();
-
-                                    if (added.ownerVersion() != null)
-                                        req.owned(entry.txKey(), added.ownerVersion());
-                                }
-
-                                break;
-                            }
-                            catch (GridCacheEntryRemovedException ignore) {
-                                assert false : "Got removed exception on entry with dht local
candidate: " + entry;
-                            }
-                        }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        assert false : "Got removed exception on entry with dht local candidate:
" + entry;
                     }
+                }
+            }
 
-                    assert req.transactionNodes() != null;
+            assert req.transactionNodes() != null;
 
-                    try {
-                        cctx.io().send(n, req, tx.ioPolicy());
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
 
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion()
+
-                                ", dhtTxId=" + tx.xidVersion() +
-                                ", node=" + n.id() + ']');
-                        }
-                    }
-                    catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion()
+
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + n.id() + ']');
+                }
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                fut.onNodeLeft();
+            }
+            catch (IgniteCheckedException e) {
+                if (!cctx.kernalContext().isStopping()) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht [txId="
+ tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() + ']');
                     }
-                    catch (IgniteCheckedException e) {
-                        if (!cctx.kernalContext().isStopping()) {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht
[txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() + ']');
-                            }
 
-                            fut.onResult(e);
-                        }
-                        else {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht,
ignore [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() +
-                                    ", err=" + e + ']');
-                            }
-                        }
+                    fut.onResult(e);
+                }
+                else {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht, ignore
[txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() +
+                            ", err=" + e + ']');
                     }
                 }
+            }
+        }
 
-                for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                    if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                        if (tx.remainingTime() == -1)
-                            return;
-
-                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId,
null, nearMapping);
-
-                        add(fut); // Append new future.
-
-                        GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                            futId,
-                            fut.futureId(),
-                            tx.topologyVersion(),
-                            tx,
-                            timeout,
-                            null,
-                            nearMapping.writes(),
-                            tx.transactionNodes(),
-                            tx.nearXidVersion(),
-                            true,
-                            tx.onePhaseCommit(),
-                            tx.subjectId(),
-                            tx.taskNameHash(),
-                            tx.activeCachesDeploymentEnabled(),
-                            tx.storeWriteThrough(),
-                            retVal);
-
-                        for (IgniteTxEntry entry : nearMapping.entries()) {
-                            if (CU.writes().apply(entry)) {
-                                try {
-                                    if (entry.explicitVersion() == null) {
-                                        GridCacheMvccCandidate added = entry.cached().candidate(version());
+        for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+            if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                if (tx.remainingTime() == -1)
+                    return;
 
-                                        assert added != null : "Null candidate for non-group-lock
entry " +
-                                            "[added=" + added + ", entry=" + entry + ']';
-                                        assert added.dhtLocal() : "Got non-dht-local candidate
for prepare future" +
-                                            "[added=" + added + ", entry=" + entry + ']';
+                MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null,
nearMapping);
+
+                add(fut); // Append new future.
+
+                GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                    futId,
+                    fut.futureId(),
+                    tx.topologyVersion(),
+                    tx,
+                    timeout,
+                    null,
+                    nearMapping.writes(),
+                    tx.transactionNodes(),
+                    tx.nearXidVersion(),
+                    true,
+                    tx.onePhaseCommit(),
+                    tx.subjectId(),
+                    tx.taskNameHash(),
+                    tx.activeCachesDeploymentEnabled(),
+                    tx.storeWriteThrough(),
+                    retVal);
+
+                for (IgniteTxEntry entry : nearMapping.entries()) {
+                    if (CU.writes().apply(entry)) {
+                        try {
+                            if (entry.explicitVersion() == null) {
+                                GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                        if (added != null && added.ownerVersion()
!= null)
-                                            req.owned(entry.txKey(), added.ownerVersion());
-                                    }
+                                assert added != null : "Null candidate for non-group-lock
entry " +
+                                    "[added=" + added + ", entry=" + entry + ']';
+                                assert added.dhtLocal() : "Got non-dht-local candidate for
prepare future" +
+                                    "[added=" + added + ", entry=" + entry + ']';
 
-                                    break;
-                                } catch (GridCacheEntryRemovedException ignore) {
-                                    assert false : "Got removed exception on entry with dht
local candidate: " + entry;
-                                }
+                                if (added != null && added.ownerVersion() != null)
+                                    req.owned(entry.txKey(), added.ownerVersion());
                             }
+
+                            break;
+                        } catch (GridCacheEntryRemovedException ignore) {
+                            assert false : "Got removed exception on entry with dht local
candidate: " + entry;
                         }
+                    }
+                }
 
-                        assert req.transactionNodes() != null;
+                assert req.transactionNodes() != null;
 
-                        try {
-                            cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
+                try {
+                    cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
 
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, sent request near [txId="
+ tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + nearMapping.primary().id() + ']');
-                            }
-                        }
-                        catch (ClusterTopologyCheckedException ignored) {
-                            fut.onNodeLeft();
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion()
+
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + nearMapping.primary().id() + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    if (!cctx.kernalContext().isStopping()) {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near [txId="
+ tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() + ']');
                         }
-                        catch (IgniteCheckedException e) {
-                            if (!cctx.kernalContext().isStopping()) {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request
near [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() + ']');
-                                }
 
-                                fut.onResult(e);
-                            }
-                            else {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request
near, ignore [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() +
-                                        ", err=" + e + ']');
-                                }
-                            }
+                        fut.onResult(e);
+                    }
+                    else {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near, ignore
[txId=" + tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() +
+                                ", err=" + e + ']');
                         }
                     }
                 }
             }
         }
-        finally {
-            markInitialized();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/01b2ce15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 55d6bdd..8ecf21f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
-import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
-import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
 /**
@@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
AutoClosea
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(GridNearTxPrepareRequest
req) {
         long timeout = remainingTime();
 
         if (state() != PREPARING) {
@@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
AutoClosea
             timeout,
             0,
             Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-            last,
+            req.last(),
             needReturnValue() && implicit());
 
         try {
-            userPrepare((serializable() && optimistic()) ? F.concat(false, writes,
reads) : writes);
+            userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(),
req.reads()) : req.writes());
 
             // Make sure to add future before calling prepare on it.
             cctx.mvcc().addFuture(fut);
@@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
AutoClosea
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01b2ce15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index beeb184..362eaac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -254,11 +254,7 @@ public class IgniteTxHandler {
     ) {
         req.txState(locTx.txState());
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
-            req.reads(),
-            req.writes(),
-            req.transactionNodes(),
-            req.last());
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
 
         if (locTx.isRollbackOnly())
             locTx.rollbackNearTxLocalAsync();
@@ -520,14 +516,7 @@ public class IgniteTxHandler {
             if (req.needReturnValue())
                 tx.needReturnValue(true);
 
-            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
-                req.reads(),
-                req.writes(),
-                req.dhtVersions(),
-                req.messageId(),
-                req.miniId(),
-                req.transactionNodes(),
-                req.last());
+            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req);
 
             if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
                 if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)


Mime
View raw message