ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/23] incubator-ignite git commit: # ignite-157-1
Date Wed, 13 May 2015 08:24:27 GMT
# ignite-157-1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a238ce35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a238ce35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a238ce35

Branch: refs/heads/ignite-835
Commit: a238ce357fb0cb0c5378fbfc64341c3167843db5
Parents: 93876df
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 7 14:49:33 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 7 16:19:26 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvccManager.java  |  4 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   | 32 ++++----
 .../distributed/dht/GridDhtTxLocalAdapter.java  | 27 +++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 81 ++++++++++----------
 .../near/GridAbstractNearTxPrepareFuture.java   |  3 +
 .../near/GridNearOptimisticTxPrepareFuture.java | 13 +++-
 .../GridNearPessimisticTxPrepareFuture.java     | 15 +++-
 .../cache/distributed/near/GridNearTxLocal.java | 43 +++++------
 .../near/GridNearTxPrepareFuture.java           | 20 ++---
 .../cache/transactions/IgniteInternalTx.java    |  4 +-
 .../cache/transactions/IgniteTxAdapter.java     |  2 +-
 .../cache/transactions/IgniteTxHandler.java     | 65 +++++++---------
 .../cache/transactions/IgniteTxManager.java     | 12 +--
 .../cache/GridCacheAbstractFullApiSelfTest.java |  2 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |  4 +-
 .../IgniteCacheFailoverTestSuite.java           |  7 +-
 16 files changed, 178 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0bb97a9..c05e4b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -510,7 +510,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Future.
      */
     @SuppressWarnings({"unchecked"})
-    @Nullable public <T> GridCacheFuture<T> future(GridCacheVersion ver, IgniteUuid futId) {
+    @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
         Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
 
         if (futs != null)
@@ -519,7 +519,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     if (log.isDebugEnabled())
                         log.debug("Found future in futures map: " + fut);
 
-                    return (GridCacheFuture<T>)fut;
+                    return fut;
                 }
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 07ced0d..614f520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -284,7 +284,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+    @Override public IgniteInternalFuture<?> prepareAsync() {
         if (optimistic()) {
             assert isSystemInvalidate();
 
@@ -296,7 +296,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 nearMiniId,
                 null,
                 true,
-                null,
                 null);
         }
 
@@ -305,14 +304,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         if (fut == null) {
             // Future must be created before any exception can be thrown.
-            if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+            if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
                 cctx,
                 this,
                 nearMiniId,
                 Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
                 true,
                 needReturnValue(),
-                null,
                 null)))
                 return prepFut.get();
         }
@@ -371,7 +369,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @param lastBackups IDs of backup nodes receiving last prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
         @Nullable Iterable<IgniteTxEntry> reads,
         @Nullable Iterable<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -379,8 +377,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         IgniteUuid nearMiniId,
         Map<UUID, Collection<UUID>> txNodes,
         boolean last,
-        Collection<UUID> lastBackups,
-        IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        Collection<UUID> lastBackups
     ) {
         // In optimistic mode prepare still can be called explicitly from salvageTx.
         GridDhtTxPrepareFuture fut = prepFut.get();
@@ -389,21 +386,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             init();
 
             // Future must be created before any exception can be thrown.
-            if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+            if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
                 cctx,
                 this,
                 nearMiniId,
                 verMap,
                 last,
                 needReturnValue(),
-                lastBackups,
-                completeCb))) {
+                lastBackups))) {
                 GridDhtTxPrepareFuture f = prepFut.get();
 
                 assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
                     "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
 
-                return f;
+                return chainOnePhasePrepare(f);
             }
         }
         else {
@@ -411,7 +407,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
 
             // Prepare was called explicitly.
-            return fut;
+            return chainOnePhasePrepare(fut);
         }
 
         if (state() != PREPARING) {
@@ -475,7 +471,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             }
         }
 
-        return fut;
+        return chainOnePhasePrepare(fut);
     }
 
     /** {@inheritDoc} */
@@ -517,8 +513,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 }
             }
             else
-                prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+                prep.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
                         try {
                             f.get(); // Check for errors of a parent future.
 
@@ -605,8 +601,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         else {
             prepFut.complete();
 
-            prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+            prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
                     try {
                         f.get(); // Check for errors of a parent future.
                     }
@@ -686,7 +682,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+    @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
         return prepFut.get();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 08fcaf6..d886989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -885,6 +886,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      */
     protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
 
+    /**
+     * @return {@code True} if transaction if finished on prepare step.
+     */
+    protected final boolean commitOnPrepare() {
+        return onePhaseCommit() && !near();
+    }
+
+    /**
+     * @param prepFut Prepare future.
+     * @return If transaction if finished on prepare step returns future which is completed after transaction finish.
+     */
+    protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
+        final GridDhtTxPrepareFuture prepFut) {
+        if (commitOnPrepare()) {
+            return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() {
+                @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut)
+                    throws IgniteCheckedException
+                {
+                    return prepFut.get();
+                }
+            });
+        }
+
+        return prepFut;
+    }
+
     /** {@inheritDoc} */
     @Override public void rollback() throws IgniteCheckedException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3a1a80a..0e64726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -50,19 +50,32 @@ import static org.apache.ignite.transactions.TransactionState.*;
  *
  */
 @SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheMvccFuture<IgniteInternalTx> {
+public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+    implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
+    /** */
+    private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER =
+        new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() {
+            @Override public boolean collect(IgniteInternalTx e) {
+                return true;
+            }
+
+            @Override public GridNearTxPrepareResponse reduce() {
+                // Nothing to aggregate.
+                return null;
+            }
+        };
+
     /** Logger. */
     private static IgniteLogger log;
 
     /** Context. */
-    private GridCacheSharedContext<K, V> cctx;
+    private GridCacheSharedContext<?, ?> cctx;
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -128,15 +141,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /** */
     private boolean invoke;
 
-    /** */
-    private IgniteInClosure<GridNearTxPrepareResponse> completeCb;
-
     /**
      * @param cctx Context.
      * @param tx Transaction.
      * @param nearMiniId Near mini future id.
      * @param dhtVerMap DHT versions map.
      * @param last {@code True} if this is last prepare operation for node.
+     * @param retVal Return value flag.
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
      */
     public GridDhtTxPrepareFuture(
@@ -146,19 +157,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
         Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
         boolean last,
         boolean retVal,
-        Collection<UUID> lastBackups,
-        IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        Collection<UUID> lastBackups
     ) {
-        super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
-            @Override public boolean collect(IgniteInternalTx e) {
-                return true;
-            }
-
-            @Override public IgniteInternalTx reduce() {
-                // Nothing to aggregate.
-                return tx;
-            }
-        });
+        super(REDUCER);
 
         this.cctx = cctx;
         this.tx = tx;
@@ -178,8 +179,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
         this.retVal = retVal;
 
-        this.completeCb = completeCb;
-
         assert dhtMap != null;
         assert nearMap != null;
     }
@@ -382,7 +381,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      * @param t Error.
      */
     public void onError(Throwable t) {
-        onDone(tx, t);
+        onDone(null, t);
     }
 
     /**
@@ -479,7 +478,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
+    @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) {
         assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " +
             "pending mini futures: " + this;
 
@@ -495,16 +494,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             // Must create prepare response before transaction is committed to grab correct return value.
             final GridNearTxPrepareResponse res = createPrepareResponse();
 
-            onComplete();
+            onComplete(res);
 
-            if (!tx.near()) {
+            if (tx.commitOnPrepare()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
                     IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
                         tx.commitAsync() : tx.rollbackAsync();
 
                     fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override
-                        public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
                             try {
                                 if (replied.compareAndSet(false, true))
                                     sendPrepareResponse(res);
@@ -530,15 +528,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
         }
         else {
             if (replied.compareAndSet(false, true)) {
+                GridNearTxPrepareResponse res = createPrepareResponse();
+
                 try {
-                    sendPrepareResponse(createPrepareResponse());
+                    sendPrepareResponse(res);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send prepare response for transaction: " + tx, e);
                 }
                 finally {
                     // Will call super.onDone().
-                    onComplete();
+                    onComplete(res);
                 }
 
                 return true;
@@ -562,16 +562,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
+     * @param res Response.
      * @throws IgniteCheckedException If failed to send response.
      */
     private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
         if (!tx.nearNodeId().equals(cctx.localNodeId()))
             cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
-        else {
-            assert completeCb != null;
-
-            completeCb.apply(res);
-        }
     }
 
     /**
@@ -616,10 +612,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             for (IgniteTxEntry e : writes) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
-                GridCacheContext cacheCtx = txEntry.context();
-
                 assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
 
+                GridCacheContext cacheCtx = txEntry.context();
+
                 while (true) {
                     try {
                         GridCacheEntryEx entry = txEntry.cached();
@@ -682,13 +678,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * Completeness callback.
      *
+     * @param res Response.
      * @return {@code True} if {@code done} flag was changed as a result of this call.
      */
-    private boolean onComplete() {
+    private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
         if (last || tx.isSystemInvalidate())
             tx.state(PREPARED);
 
-        if (super.onDone(tx, err.get())) {
+        if (super.onDone(res, err.get())) {
             // Don't forget to clean up.
             cctx.mvcc().removeFuture(this);
 
@@ -702,7 +699,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
      * Completes this future.
      */
     public void complete() {
-        onComplete();
+        onComplete(null);
     }
 
     /**
@@ -717,7 +714,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
         if (tx.empty()) {
             tx.setRollbackOnly();
 
-            onDone(tx);
+            onDone((GridNearTxPrepareResponse)null);
         }
 
         this.reads = reads;
@@ -821,7 +818,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                         try {
                             GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-                            GridCacheContext<K, V> cacheCtx = cached.context();
+                            GridCacheContext<?, ?> cacheCtx = cached.context();
 
                             if (entry.explicitVersion() == null) {
                                 GridCacheMvccCandidate added = cached.candidate(version());
@@ -977,7 +974,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
         GridCacheContext cacheCtx = entry.context();
 
-        GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+        GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
         ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
 
@@ -1234,7 +1231,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
 
                 for (GridCacheEntryInfo info : res.preloadEntries()) {
-                    GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());
+                    GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(info.cacheId());
 
                     while (true) {
                         GridCacheEntryEx entry = cacheCtx.cache().entryEx(info.key());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
index 905f018..6f94f21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
@@ -153,6 +153,9 @@ public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdenti
      * @param res Response.
      */
     protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+        if (res == null)
+            return;
+
         assert res.error() == null : res;
         assert F.isEmpty(res.invalidPartitions()) : res;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2fbca7b..110cca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -535,9 +535,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepare
             // At this point, if any new node joined, then it is
             // waiting for this transaction to complete, so
             // partition reassignments are not possible here.
-            cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-                @Override public void apply(GridNearTxPrepareResponse res) {
-                    fut.onResult(n.id(), res);
+            IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+            prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+                @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                    try {
+                        fut.onResult(n.id(), prepFut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onResult(e);
+                    }
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 84a4ab8..e3f24f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -203,9 +203,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
             add(fut);
 
             if (node.isLocal()) {
-                cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-                    @Override public void apply(GridNearTxPrepareResponse res) {
-                        fut.onResult(res);
+                IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
+                    tx,
+                    req);
+
+                prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+                    @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                        try {
+                            fut.onResult(prepFut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            fut.onError(e);
+                        }
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f7a43bb..a003d19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,8 +61,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** Future. */
     @GridToStringExclude
-    private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut =
-        new AtomicReference<>();
+    private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>();
 
     /** */
     @GridToStringExclude
@@ -682,7 +681,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+    @Override public IgniteInternalFuture<?> prepareAsync() {
         GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get();
 
         if (fut == null) {
@@ -719,10 +718,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         cctx.mvcc().addFuture(fut);
 
-        IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get();
+        IgniteInternalFuture<?> prepareFut = prepFut.get();
 
-        prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-            @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+        prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
                 GridNearTxFinishFuture fut0 = commitFut.get();
 
                 try {
@@ -766,7 +765,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         cctx.mvcc().addFuture(fut);
 
-        IgniteInternalFuture<IgniteInternalTx> prepFut = this.prepFut.get();
+        IgniteInternalFuture<?> prepFut = this.prepFut.get();
 
         if (prepFut == null || prepFut.isDone()) {
             try {
@@ -790,8 +789,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             }
         }
         else {
-            prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+            prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
                     try {
                         // Check for errors in prepare future.
                         f.get();
@@ -834,12 +833,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
      * @return Future that will be completed when locks are acquired.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public IgniteInternalFuture<IgniteInternalTx> prepareAsyncLocal(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
         @Nullable Collection<IgniteTxEntry> reads,
         @Nullable Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes, boolean last,
-        Collection<UUID> lastBackups,
-        IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        Collection<UUID> lastBackups
     ) {
         if (state() != PREPARING) {
             if (timedOut())
@@ -854,15 +852,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         init();
 
-        GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture<>(
+        GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture(
             cctx,
             this,
             IgniteUuid.randomUuid(),
             Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
             last,
             needReturnValue() && implicit(),
-            lastBackups,
-            completeCb);
+            lastBackups);
 
         try {
             // At this point all the entries passed in must be enlisted in transaction because this is an
@@ -901,7 +898,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             }
         }
 
-        return fut;
+        return chainOnePhasePrepare(fut);
     }
 
     /**
@@ -917,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (pessimistic())
             prepareAsync();
 
-        IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+        IgniteInternalFuture<?> prep = prepFut.get();
 
         // Do not create finish future if there are no remote nodes.
         if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
@@ -953,8 +950,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             }
         }
         else
-            prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+            prep.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
                     try {
                         f.get(); // Check for errors of a parent future.
 
@@ -990,7 +987,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         cctx.mvcc().addFuture(fut);
 
-        IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+        IgniteInternalFuture<?> prep = prepFut.get();
 
         if (prep == null || prep.isDone()) {
             try {
@@ -1006,8 +1003,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             fut.finish();
         }
         else
-            prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+            prep.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
                     try {
                         f.get(); // Check for errors of a parent future.
                     }
@@ -1200,7 +1197,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+    @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
         return prepFut.get();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 962d973..9cf4aca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -648,11 +648,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
             add(fut);
 
             if (node.isLocal()) {
-                cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-                    @Override public void apply(GridNearTxPrepareResponse res) {
-                        fut.onResult(node.id(), res);
-                    }
-                });
+//                cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+//                    @Override public void apply(GridNearTxPrepareResponse res) {
+//                        fut.onResult(node.id(), res);
+//                    }
+//                });
             }
             else {
                 try {
@@ -755,11 +755,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
             // At this point, if any new node joined, then it is
             // waiting for this transaction to complete, so
             // partition reassignments are not possible here.
-            cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-                @Override public void apply(GridNearTxPrepareResponse res) {
-                    fut.onResult(n.id(), res);
-                }
-            });
+//            cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+//                @Override public void apply(GridNearTxPrepareResponse res) {
+//                    fut.onResult(n.id(), res);
+//                }
+//            });
         }
         else {
             assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 8dc07cc..2bed843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -551,7 +551,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      *
      * @return Future for prepare step.
      */
-    public IgniteInternalFuture<IgniteInternalTx> prepareAsync();
+    public IgniteInternalFuture<?> prepareAsync();
 
     /**
      * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
@@ -580,7 +580,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     /**
      * @return Future for transaction prepare if prepare is in progress.
      */
-    @Nullable public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture();
+    @Nullable public IgniteInternalFuture<?> currentPrepareFuture();
 
     /**
      * @param state Transaction state.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 82d68b3..64cc77f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1008,7 +1008,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+    @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d98b4ff..a403f28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -58,9 +58,9 @@ public class IgniteTxHandler {
      * @param req Request.
      * @return Prepare future.
      */
-    public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
+    public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
         final GridNearTxPrepareRequest req) {
-        return prepareTx(nearNodeId, null, req, null);
+        return prepareTx(nearNodeId, null, req);
     }
 
     /**
@@ -138,32 +138,28 @@ public class IgniteTxHandler {
      * @param nearNodeId Near node ID that initiated transaction.
      * @param locTx Optional local transaction.
      * @param req Near prepare request.
-     * @param completeCb Completion callback.
      * @return Future for transaction.
      */
-    public IgniteInternalFuture<IgniteInternalTx> prepareTx(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
         UUID nearNodeId,
         @Nullable GridNearTxLocal locTx,
-        GridNearTxPrepareRequest req,
-        @Nullable IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        GridNearTxPrepareRequest req
     ) {
         assert nearNodeId != null;
         assert req != null;
 
         if (locTx != null) {
-            assert completeCb != null;
-
             if (req.near()) {
                 // Make sure not to provide Near entries to DHT cache.
                 req.cloneEntries();
 
-                return prepareNearTx(nearNodeId, req, completeCb);
+                return prepareNearTx(nearNodeId, req);
             }
             else
-                return prepareColocatedTx(locTx, req, completeCb);
+                return prepareColocatedTx(locTx, req);
         }
         else
-            return prepareNearTx(nearNodeId, req, null);
+            return prepareNearTx(nearNodeId, req);
     }
 
     /**
@@ -171,30 +167,27 @@ public class IgniteTxHandler {
      *
      * @param locTx Local transaction.
      * @param req Near prepare request.
-     * @param completeCb Completion callback.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
+    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
         final GridNearTxLocal locTx,
-        final GridNearTxPrepareRequest req,
-        final IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        final GridNearTxPrepareRequest req
     ) {
         IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
 
         return new GridEmbeddedFuture<>(
             fut,
-            new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception ex) {
+            new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+                @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
                     if (ex != null)
                         throw new GridClosureException(ex);
 
-                    IgniteInternalFuture<IgniteInternalTx> fut = locTx.prepareAsyncLocal(
+                    IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
                         req.reads(),
                         req.writes(),
                         req.transactionNodes(),
                         req.last(),
-                        req.lastBackups(),
-                        completeCb);
+                        req.lastBackups());
 
                     if (locTx.isRollbackOnly())
                         locTx.rollbackAsync();
@@ -202,18 +195,16 @@ public class IgniteTxHandler {
                     return fut;
                 }
             },
-            new C2<IgniteInternalTx, Exception, IgniteInternalTx>() {
-                @Nullable @Override public IgniteInternalTx apply(IgniteInternalTx tx, Exception e) {
+            new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
+                @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
                     if (e != null) {
-                        // tx can be null of exception occurred.
-                        if (tx != null)
-                            tx.setRollbackOnly(); // Just in case.
+                        locTx.setRollbackOnly(); // Just in case.
 
                         if (!(e instanceof IgniteTxOptimisticCheckedException))
-                            U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+                            U.error(log, "Failed to prepare transaction: " + locTx, e);
                     }
 
-                    return tx;
+                    return res;
                 }
             }
         );
@@ -224,13 +215,11 @@ public class IgniteTxHandler {
      *
      * @param nearNodeId Near node ID that initiated transaction.
      * @param req Near prepare request.
-     * @param completeCb Completion callback.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
+    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
         final UUID nearNodeId,
-        final GridNearTxPrepareRequest req,
-        IgniteInClosure<GridNearTxPrepareResponse> completeCb
+        final GridNearTxPrepareRequest req
     ) {
         ClusterNode nearNode = ctx.node(nearNodeId);
 
@@ -315,7 +304,7 @@ public class IgniteTxHandler {
             if (req.returnValue())
                 tx.needReturnValue(true);
 
-            IgniteInternalFuture<IgniteInternalTx> fut = tx.prepareAsync(
+            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
                 req.reads(),
                 req.writes(),
                 req.dhtVersions(),
@@ -323,8 +312,7 @@ public class IgniteTxHandler {
                 req.miniId(),
                 req.transactionNodes(),
                 req.last(),
-                req.lastBackups(),
-                completeCb);
+                req.lastBackups());
 
             if (tx.isRollbackOnly()) {
                 try {
@@ -337,8 +325,8 @@ public class IgniteTxHandler {
 
             final GridDhtTxLocal tx0 = tx;
 
-            fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> txFut) {
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> txFut) {
                     try {
                         txFut.get();
                     }
@@ -354,7 +342,7 @@ public class IgniteTxHandler {
             return fut;
         }
         else
-            return new GridFinishedFuture<>((IgniteInternalTx)null);
+            return new GridFinishedFuture<>((GridNearTxPrepareResponse)null);
     }
 
     /**
@@ -399,8 +387,7 @@ public class IgniteTxHandler {
      * @param res Response.
      */
     private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
-        GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().
-            <IgniteInternalTx>future(res.version(), res.futureId());
+        GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8a1d490..2122602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1816,7 +1816,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             if (nearVer.equals(tx.nearXidVersion())) {
                 TransactionState state = tx.state();
 
-                IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+                IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                 if (prepFut != null && !prepFut.isDone()) {
                     if (log.isDebugEnabled())
@@ -1828,8 +1828,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     final Collection<GridCacheVersion> processedVers0 = processedVers;
 
-                    prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> prepFut) {
+                    prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> prepFut) {
                             if (log.isDebugEnabled())
                                 log.debug("Transaction prepare future finished: " + tx);
 
@@ -2029,11 +2029,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                             if (tx.state() == PREPARED)
                                 commitIfPrepared(tx);
                             else {
-                                IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+                                IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                                 if (prepFut != null) {
-                                    prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                                    prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                        @Override public void apply(IgniteInternalFuture<?> fut) {
                                             if (tx.state() == PREPARED)
                                                 commitIfPrepared(tx);
                                             else if (tx.setRollbackOnly())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 4dc371c..a346b65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4343,7 +4343,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
             assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
             assertEquals(i, map.get(key));
-            assertEquals(val2, cacheSkipStore.get(key));
+            assertEquals("For key " + key, val2, cacheSkipStore.get(key));
         }
 
         for (String key : keys) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index ee2f16b..f996877 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-            IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+            IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
 
             waitPrepared(ignite(1));
 
@@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
         commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-        IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+        IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
 
         waitPrepared(ignite(1));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 529bd23..2acd6a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -54,10 +54,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderFailoverSelfTest.class);
         suite.addTestSuite(GridCacheAtomicReplicatedFailoverSelfTest.class);
 
-        // TODO IGNITE-157.
-        // suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
-        // suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
-        // suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+        suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
+        suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
+        suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
 
         suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class);
         suite.addTestSuite(IgniteCacheTxNodeJoinTest.class);


Mime
View raw message