ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Mon, 05 Oct 2015 13:10:16 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 820c0ecd3 -> 920d7472d


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/920d7472
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/920d7472
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/920d7472

Branch: refs/heads/ignite-1607
Commit: 920d7472d52c45765990269717f0b11563b2575e
Parents: 820c0ec
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 5 12:29:45 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 5 16:10:02 2015 +0300

----------------------------------------------------------------------
 ...arOptimisticSerializableTxPrepareFuture.java | 183 ++++-
 .../near/GridNearOptimisticTxPrepareFuture.java |  18 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   8 +-
 .../cache/distributed/near/GridNearTxLocal.java |   7 +
 .../near/GridNearTxPrepareFutureAdapter.java    |  12 +-
 .../CacheSerializableTransactionsTest.java      | 697 +++++++++++++++++++
 .../cache/CacheSerializableTxTest.java          | 687 ------------------
 ...niteCacheClientNodeChangingTopologyTest.java | 147 ++++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 9 files changed, 1019 insertions(+), 743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 62fc40f..4a7efb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -46,6 +47,8 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -58,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -74,6 +78,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     @GridToStringInclude
     private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
 
+    /** */
+    private final AtomicReference<ClientRemapFuture> remapFutRef;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -85,6 +92,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
         // Should wait for all mini futures completion before finishing tx.
         ignoreChildFailures(IgniteCheckedException.class);
+
+        remapFutRef = cctx.kernalContext().clientNode() ? new AtomicReference<ClientRemapFuture>() : null;
     }
 
     /** {@inheritDoc} */
@@ -158,8 +167,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null)
             tx.onOptimisticException(nodeId);
 
-        if (err.compareAndSet(null, e))
-            tx.setRollbackOnly();
+        err.compareAndSet(null, e);
     }
 
     /**
@@ -183,7 +191,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
                 if (isMini(fut)) {
                     MiniFuture f = (MiniFuture)fut;
 
@@ -230,6 +238,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             tx.state(PREPARED);
 
         if (super.onDone(tx, err0)) {
+            if (err0 != null)
+                tx.setRollbackOnly();
+
             // Don't forget to clean up.
             cctx.mvcc().removeFuture(this);
 
@@ -331,18 +342,18 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         }
         else {
             topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override
-                public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
                     cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
-                        @Override
-                        public void run() {
+                        @Override public void run() {
                             try {
                                 fut.get();
 
                                 prepareOnTopology(remap, c);
-                            } catch (IgniteCheckedException e) {
+                            }
+                            catch (IgniteCheckedException e) {
                                 onDone(e);
-                            } finally {
+                            }
+                            finally {
                                 cctx.txContextReset();
                             }
                         }
@@ -435,7 +446,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                 return;
             }
 
-            prepare(tx.readEntries(), tx.writeEntries());
+            prepare(tx.readEntries(), tx.writeEntries(), remap);
 
             markInitialized();
         }
@@ -447,11 +458,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     /**
      * @param reads Read entries.
      * @param writes Write entries.
+     * @param remap Remap flag.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("unchecked")
     private void prepare(
         Iterable<IgniteTxEntry> reads,
-        Iterable<IgniteTxEntry> writes
+        Iterable<IgniteTxEntry> writes,
+        boolean remap
     ) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
@@ -496,19 +510,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         checkOnePhase();
 
         for (GridDistributedTxMapping m : mappings.values()) {
-            if (!prepare(m))
+            assert !m.empty();
+
+            MiniFuture fut = new MiniFuture(m);
+
+            add(fut);
+        }
+
+        Collection<MiniFuture> futs = (Collection)futures();
+
+        for (MiniFuture fut : futs) {
+            if (remap && fut.rcvRes.get())
+                continue;
+
+            IgniteCheckedException err = prepare(fut);
+
+            if (err != null) {
+                onDone(err);
+
                 break;
+            }
         }
 
         markInitialized();
     }
 
     /**
-     * @param m Mapping.
+     * @param fut Mini future.
      * @return {@code False} if should stop mapping.
      */
-    private boolean prepare(GridDistributedTxMapping m) {
-        assert !m.empty();
+    private IgniteCheckedException prepare(final MiniFuture fut) {
+        GridDistributedTxMapping m = fut.mapping();
 
         final ClusterNode n = m.node();
 
@@ -545,16 +577,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             catch (IgniteCheckedException e) {
                 onError(m.node().id(), e);
 
-                return false;
+                return e;
             }
         }
 
-        final MiniFuture fut = new MiniFuture(m);
-
         req.miniId(fut.futureId());
 
-        add(fut); // Append new future.
-
         // If this is the primary node for the keys.
         if (n.isLocal()) {
             IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
@@ -579,16 +607,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
                 fut.onResult(e);
 
-                return false;
+                return e;
             }
             catch (IgniteCheckedException e) {
                 fut.onResult(e);
 
-                return false;
+                return e;
             }
         }
 
-        return true;
+        return null;
     }
 
     /**
@@ -701,7 +729,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
+        /** */
+        private boolean remap = true;
+
+        /**
+         *
+         */
+        public ClientRemapFuture() {
+            super();
+
+            reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() {
+                @Override public boolean collect(GridNearTxPrepareResponse res) {
+                    assert res != null;
+
+                    if (res.clientRemapVersion() == null)
+                        remap = false;
+
+                    return true;
+                }
+
+                @Override public Boolean reduce() {
+                    return remap;
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -782,7 +840,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
          * @param nodeId Failed node ID.
          * @param res Result callback.
          */
-        void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+        void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
@@ -798,42 +856,93 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                         assert cctx.kernalContext().clientNode();
                         assert m.clientFirst();
 
-                        IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+                        tx.onClientRemap(m.node().id());
 
-                        if (affFut != null && !affFut.isDone()) {
-                            affFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                @Override public void apply(IgniteInternalFuture<?> fut) {
-                                    try {
-                                        fut.get();
+                        ClientRemapFuture remapFut = new ClientRemapFuture();
+
+                        if (remapFutRef.compareAndSet(null, remapFut)) {
+                            Collection<MiniFuture> futs = (Collection)futures();
 
-                                        remap();
+                            for (MiniFuture fut : futs) {
+                                if (fut != this)
+                                    remapFut.add(fut);
+                            }
+
+                            remapFut.markInitialized();
+
+                            remapFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                                @Override public void apply(IgniteInternalFuture<Boolean> remapFut) {
+                                    try {
+                                        IgniteInternalFuture<?> affFut =
+                                            cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                                        if (affFut == null)
+                                            affFut = new GridFinishedFuture<Object>();
+
+                                        if (remapFut.get()) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Will remap client tx [" +
+                                                    "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this +
+                                                    ", topVer=" + res.topologyVersion() + ']');
+                                            }
+
+                                            boolean set = remapFutRef.compareAndSet((ClientRemapFuture)remapFut, null);
+
+                                            assert set;
+
+                                            affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                                @Override public void apply(IgniteInternalFuture<?> affFut) {
+                                                    try {
+                                                        affFut.get();
+
+                                                        remap(res);
+                                                    }
+                                                    catch (IgniteCheckedException e) {
+                                                        onDone(e);
+                                                    }
+                                                }
+                                            });
+                                        }
+                                        else {
+                                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                                "Cluster topology changed while client transaction is preparing.");
+
+                                            err.retryReadyFuture(affFut);
+
+                                            onDone(err);
+                                        }
                                     }
                                     catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Prepare failed, will not remap tx: " +
+                                                GridNearOptimisticSerializableTxPrepareFuture.this);
+                                        }
+
                                         onDone(e);
                                     }
                                 }
                             });
                         }
                         else
-                            remap();
+                            onDone(res);
                     }
                     else {
                         onPrepareResponse(m, res);
 
                         // Finish this mini future.
-                        onDone(tx);
+                        onDone(res);
                     }
                 }
             }
         }
 
         /**
-         *
+         * @param res Response.
          */
-        private void remap() {
+        private void remap(final GridNearTxPrepareResponse res) {
             prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
-                    onDone(tx);
+                    onDone(res);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4a0caa9..0646aac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -199,7 +199,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
                 if (isMini(fut)) {
                     MiniFuture f = (MiniFuture)fut;
 
@@ -740,7 +740,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -827,7 +827,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
          * @param nodeId Failed node ID.
          * @param res Result callback.
          */
-        void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+        void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
@@ -849,7 +849,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                                     try {
                                         fut.get();
 
-                                        remap();
+                                        remap(res);
                                     }
                                     catch (IgniteCheckedException e) {
                                         onDone(e);
@@ -858,7 +858,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                             });
                         }
                         else
-                            remap();
+                            remap(res);
                     }
                     else {
                         onPrepareResponse(m, res);
@@ -868,19 +868,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                             proceedPrepare(mappings);
 
                         // Finish this mini future.
-                        onDone(tx);
+                        onDone(res);
                     }
                 }
             }
         }
 
         /**
-         *
+         * @param res Response.
          */
-        private void remap() {
+        private void remap(final GridNearTxPrepareResponse res) {
             prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
-                    onDone(tx);
+                    onDone(res);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b8d2250..3da4340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -103,7 +103,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         if (!isDone()) {
             assert res.clientRemapVersion() == null : res;
 
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
                 MiniFuture f = (MiniFuture)fut;
 
                 if (f.futureId().equals(res.miniId())) {
@@ -291,7 +291,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -331,7 +331,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             else {
                 onPrepareResponse(m, res);
 
-                onDone(tx);
+                onDone(res);
             }
         }
 
@@ -343,7 +343,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 tx.markForBackupCheck();
 
                 // Do not fail future for one-phase transaction right away.
-                onDone(tx);
+                onDone((GridNearTxPrepareResponse)null);
             }
 
             onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 721de47..bebd9d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -595,6 +595,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
+     * @param nodeId Primary node id.
+     */
+    void onClientRemap(UUID nodeId) {
+        mappings.remove(nodeId);
+    }
+
+    /**
      * @param nodeId Node ID to mark with explicit lock.
      * @return {@code True} if mapping was found.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index fac7a12..c9ea42a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -48,15 +48,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
 /**
  * Common code for tx prepare in optimistic and pessimistic modes.
  */
-public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx>
+public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx>
     implements GridCacheFuture<IgniteInternalTx> {
     /** Logger reference. */
     protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
     /** */
-    private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER =
-        new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
-            @Override public boolean collect(IgniteInternalTx e) {
+    private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER =
+        new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() {
+            @Override public boolean collect(GridNearTxPrepareResponse e) {
                 return true;
             }
 
@@ -94,7 +94,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit
      * @param tx Transaction.
      */
     public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
-        super(cctx.kernalContext(), REDUCER);
+        super(REDUCER);
 
         assert cctx != null;
         assert tx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
new file mode 100644
index 0000000..68899e7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 3;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testTxRollbackRead1() throws Exception {
+        txRollbackRead(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testTxRollbackRead2() throws Exception {
+        txRollbackRead(false);
+    }
+
+    /**
+     * @param noVal If {@code true} there is no cache value when read in tx.
+     * @throws Exception If failed.
+     */
+    private void txRollbackRead(boolean noVal) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = new ArrayList<>();
+
+                keys.add(nearKey(cache));
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    if (!noVal) {
+                        expVal = -1;
+
+                        cache.put(key, expVal);
+                    }
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache.get(key);
+
+                            assertEquals(expVal, val);
+
+                            updateKey(cache, key, 1);
+
+                            log.info("Commit");
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    assertEquals(1, (Object) cache.get(key));
+                }
+            }
+            finally {
+                ignite0.destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testTxRollbackReadWrite() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        final IgniteCache<Integer, Integer> cache =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+        final Integer key = nearKey(cache);
+
+        try {
+            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                Integer val = cache.get(key);
+
+                assertNull(val);
+
+                updateKey(cache, key, 1);
+
+                cache.put(key, 2);
+
+                log.info("Commit");
+
+                tx.commit();
+            }
+
+            fail();
+        }
+        catch (TransactionOptimisticException e) {
+            log.info("Expected exception: " + e);
+        }
+
+        assertEquals(1, (Object)cache.get(key));
+
+        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+            cache.put(key, 2);
+
+            tx.commit();
+        }
+
+        assertEquals(2, (Object) cache.get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked1() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                final Integer key = nearKey(cache);
+
+                CountDownLatch latch = new CountDownLatch(1);
+
+                IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
+
+                try {
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        cache.put(key, 2);
+
+                        log.info("Commit");
+
+                        tx.commit();
+                    }
+
+                    fail();
+                }
+                catch (TransactionOptimisticException e) {
+                    log.info("Expected exception: " + e);
+                }
+
+                latch.countDown();
+
+                fut.get();
+
+                assertEquals(1, (Object)cache.get(key));
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.put(key, 2);
+
+                    tx.commit();
+                }
+
+                assertEquals(2, (Object)cache.get(key));
+            }
+            finally {
+                ignite0.destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked2() throws Exception {
+        rollbackIfLockedPartialLock(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked3() throws Exception {
+        rollbackIfLockedPartialLock(true);
+    }
+
+    /**
+     * @param locKey If {@code true} gets lock for local key.
+     * @throws Exception If failed.
+     */
+    public void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                final Integer key1 = primaryKey(ignite(1).cache(cache.getName()));
+                final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName()));
+
+                CountDownLatch latch = new CountDownLatch(1);
+
+                IgniteInternalFuture<?> fut = lockKey(latch, cache, key1);
+
+                try {
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        cache.put(key1, 2);
+                        cache.put(key2, 2);
+
+                        log.info("Commit2");
+
+                        tx.commit();
+                    }
+
+                    fail();
+                }
+                catch (TransactionOptimisticException e) {
+                    log.info("Expected exception: " + e);
+                }
+
+                latch.countDown();
+
+                fut.get();
+
+                assertEquals(1, (Object) cache.get(key1));
+                assertNull(cache.get(key2));
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.put(key1, 2);
+                    cache.put(key2, 2);
+
+                    log.info("Commit3");
+
+                    tx.commit();
+                }
+
+                assertEquals(2, (Object) cache.get(key2));
+                assertEquals(2, (Object) cache.get(key2));
+            }
+            finally {
+                ignite0.destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlock() throws Exception {
+        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception {
+        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockClients() throws Exception {
+        concurrentUpdateNoDeadlock(clients(), 20, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception {
+        concurrentUpdateNoDeadlock(clients(), 20, true);
+    }
+
+    /**
+     * @return Client nodes.
+     */
+    private List<Ignite> clients() {
+        List<Ignite> clients = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite ignite = ignite(SRVS + i);
+
+            assertTrue(ignite.configuration().isClientMode());
+
+            clients.add(ignite);
+        }
+
+        return clients;
+    }
+
+    /**
+     * @param updateNodes Nodes executing updates.
+     * @param threads Number of threads executing updates.
+     * @param restart If {@code true} restarts one node.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
+        int threads,
+        final boolean restart) throws Exception {
+        assert updateNodes.size() > 0;
+
+        final Ignite ignite0 = ignite(0);
+
+        final String cacheName =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+
+        try {
+            final int KEYS = 100;
+
+            final AtomicBoolean finished = new AtomicBoolean();
+
+            IgniteInternalFuture<Object> fut = null;
+
+            try {
+                if (restart) {
+                    fut = GridTestUtils.runAsync(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            while (!finished.get()) {
+                                stopGrid(0);
+
+                                U.sleep(300);
+
+                                Ignite ignite = startGrid(0);
+
+                                assertFalse(ignite.configuration().isClientMode());
+                            }
+
+                            return null;
+                        }
+                    });
+                }
+
+                for (int i = 0; i < 10; i++) {
+                    log.info("Iteration: " + i);
+
+                    final long stopTime = U.currentTimeMillis() + 10_000;
+
+                    final AtomicInteger idx = new AtomicInteger();
+
+                    IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            int nodeIdx = idx.getAndIncrement() % updateNodes.size();
+
+                            Ignite node = updateNodes.get(nodeIdx);
+
+                            log.info("Tx thread: " + node.name());
+
+                            final IgniteTransactions txs = node.transactions();
+
+                            final IgniteCache<Integer, Integer> cache = node.cache(cacheName);
+
+                            assertNotNull(cache);
+
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            while (U.currentTimeMillis() < stopTime) {
+                                final Map<Integer, Integer> keys = new LinkedHashMap<>();
+
+                                for (int i = 0; i < KEYS / 2; i++)
+                                    keys.put(rnd.nextInt(KEYS), rnd.nextInt());
+
+                                try {
+                                    if (restart) {
+                                        doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() {
+                                            @Override public Void call() throws Exception {
+                                                cache.putAll(keys);
+
+                                                return null;
+                                            }
+                                        });
+                                    }
+                                    else {
+                                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                            cache.putAll(keys);
+
+                                            tx.commit();
+                                        }
+                                    }
+                                }
+                                catch (TransactionOptimisticException ignore) {
+                                    // No-op.
+                                }
+                                catch (Throwable e) {
+                                    log.error("Unexpected error: " + e, e);
+
+                                    throw e;
+                                }
+                            }
+
+                            return null;
+                        }
+                    }, threads, "tx-thread");
+
+                    updateFut.get(60, SECONDS);
+
+                    IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName);
+
+                    for (int key = 0; key < KEYS; key++) {
+                        Integer val = cache.get(key);
+
+                        for (int node = 1; node < SRVS + CLIENTS; node++)
+                            assertEquals(val, ignite(node).cache(cache.getName()).get(key));
+                    }
+                }
+
+                finished.set(true);
+
+                if (fut != null)
+                    fut.get();
+            }
+            finally {
+                finished.set(true);
+            }
+        }
+        finally {
+            ignite(1).destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @return Cache configurations.
+     */
+    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
+        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+
+        // No store, no near.
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false));
+
+        // Store, no near.
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false));
+
+        // No store, near.
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true));
+
+        // Store, near.
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     */
+    private void logCacheInfo(CacheConfiguration<?, ?> ccfg) {
+        log.info("Test cache [mode=" + ccfg.getCacheMode() +
+            ", sync=" + ccfg.getWriteSynchronizationMode() +
+            ", backups=" + ccfg.getBackups() +
+            ", near=" + (ccfg.getNearConfiguration() != null) +
+            ", store=" + ccfg.isWriteThrough() + ']');
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param val Value.
+     * @throws Exception If failed.
+     */
+    private void updateKey(
+        final IgniteCache<Integer, Integer> cache,
+        final Integer key,
+        final Integer val) throws Exception {
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, val);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "update-thread");
+
+        fut.get();
+    }
+
+    /**
+     * @param releaseLatch Release lock latch.
+     * @param cache Cache.
+     * @param key Key.
+     * @return Future.
+     * @throws Exception If failed.
+     */
+    private IgniteInternalFuture<?> lockKey(
+        final CountDownLatch releaseLatch,
+        final IgniteCache<Integer, Integer> cache,
+        final Integer key) throws Exception {
+        final CountDownLatch lockLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, 1);
+
+                    log.info("Locked key: " + key);
+
+                    lockLatch.countDown();
+
+                    assertTrue(releaseLatch.await(100000, SECONDS));
+
+                    log.info("Commit tx: " + key);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "lock-thread");
+
+        assertTrue(lockLatch.await(10, SECONDS));
+
+        return fut;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param storeEnabled If {@code true} adds cache store.
+     * @param nearCache If {@code true} near cache is enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean storeEnabled,
+        boolean nearCache) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(backups);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        if (storeEnabled) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setWriteThrough(true);
+        }
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<Integer, Integer> create() {
+            return new CacheStoreAdapter<Integer, Integer>() {
+                @Override public Integer load(Integer key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java
deleted file mode 100644
index 217e362..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java
+++ /dev/null
@@ -1,687 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
-import javax.cache.configuration.Factory;
-import javax.cache.integration.CacheLoaderException;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.cache.store.CacheStoreAdapter;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionOptimisticException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
-
-/**
- *
- */
-public class CacheSerializableTxTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int SRVS = 4;
-
-    /** */
-    private static final int CLIENTS = 3;
-
-    /** */
-    private boolean client;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(SRVS);
-
-        client = true;
-
-        startGridsMultiThreaded(SRVS, CLIENTS);
-
-        client = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackRead1() throws Exception {
-        txRollbackRead(true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackRead2() throws Exception {
-        txRollbackRead(false);
-    }
-
-    /**
-     * @param noVal If {@code true} there is no cache value when read in tx.
-     * @throws Exception If failed.
-     */
-    private void txRollbackRead(boolean noVal) throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        final IgniteTransactions txs = ignite0.transactions();
-
-        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
-            logCacheInfo(ccfg);
-
-            try {
-                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
-
-                List<Integer> keys = new ArrayList<>();
-
-                keys.add(nearKey(cache));
-
-                for (Integer key : keys) {
-                    log.info("Test key: " + key);
-
-                    Integer expVal = null;
-
-                    if (!noVal) {
-                        expVal = -1;
-
-                        cache.put(key, expVal);
-                    }
-
-                    try {
-                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                            Integer val = cache.get(key);
-
-                            assertEquals(expVal, val);
-
-                            updateKey(cache, key, 1);
-
-                            log.info("Commit");
-
-                            tx.commit();
-                        }
-
-                        fail();
-                    }
-                    catch (TransactionOptimisticException e) {
-                        log.info("Expected exception: " + e);
-                    }
-
-                    assertEquals(1, (Object) cache.get(key));
-                }
-            }
-            finally {
-                ignite0.destroyCache(ccfg.getName());
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackReadWrite() throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        final IgniteTransactions txs = ignite0.transactions();
-
-        final IgniteCache<Integer, Integer> cache =
-            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
-
-        final Integer key = nearKey(cache);
-
-        try {
-            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                Integer val = cache.get(key);
-
-                assertNull(val);
-
-                updateKey(cache, key, 1);
-
-                cache.put(key, 2);
-
-                log.info("Commit");
-
-                tx.commit();
-            }
-
-            fail();
-        }
-        catch (TransactionOptimisticException e) {
-            log.info("Expected exception: " + e);
-        }
-
-        assertEquals(1, (Object)cache.get(key));
-
-        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-            cache.put(key, 2);
-
-            tx.commit();
-        }
-
-        assertEquals(2, (Object) cache.get(key));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackIfLocked1() throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        IgniteTransactions txs = ignite0.transactions();
-
-        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
-            logCacheInfo(ccfg);
-
-            try {
-                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
-
-                final Integer key = nearKey(cache);
-
-                CountDownLatch latch = new CountDownLatch(1);
-
-                IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
-
-                try {
-                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        cache.put(key, 2);
-
-                        log.info("Commit");
-
-                        tx.commit();
-                    }
-
-                    fail();
-                }
-                catch (TransactionOptimisticException e) {
-                    log.info("Expected exception: " + e);
-                }
-
-                latch.countDown();
-
-                fut.get();
-
-                assertEquals(1, (Object)cache.get(key));
-
-                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                    cache.put(key, 2);
-
-                    tx.commit();
-                }
-
-                assertEquals(2, (Object)cache.get(key));
-            }
-            finally {
-                ignite0.destroyCache(ccfg.getName());
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackIfLocked2() throws Exception {
-        rollbackIfLockedPartialLock(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackIfLocked3() throws Exception {
-        rollbackIfLockedPartialLock(true);
-    }
-
-    /**
-     * @param locKey If {@code true} gets lock for local key.
-     * @throws Exception If failed.
-     */
-    public void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        final IgniteTransactions txs = ignite0.transactions();
-
-        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
-            logCacheInfo(ccfg);
-
-            try {
-                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
-
-                final Integer key1 = primaryKey(ignite(1).cache(cache.getName()));
-                final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName()));
-
-                CountDownLatch latch = new CountDownLatch(1);
-
-                IgniteInternalFuture<?> fut = lockKey(latch, cache, key1);
-
-                try {
-                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        cache.put(key1, 2);
-                        cache.put(key2, 2);
-
-                        log.info("Commit2");
-
-                        tx.commit();
-                    }
-
-                    fail();
-                }
-                catch (TransactionOptimisticException e) {
-                    log.info("Expected exception: " + e);
-                }
-
-                latch.countDown();
-
-                fut.get();
-
-                assertEquals(1, (Object) cache.get(key1));
-                assertNull(cache.get(key2));
-
-                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                    cache.put(key1, 2);
-                    cache.put(key2, 2);
-
-                    log.info("Commit3");
-
-                    tx.commit();
-                }
-
-                assertEquals(2, (Object) cache.get(key2));
-                assertEquals(2, (Object) cache.get(key2));
-            }
-            finally {
-                ignite0.destroyCache(ccfg.getName());
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConcurrentUpdateNoDeadlock() throws Exception {
-        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception {
-        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConcurrentUpdateNoDeadlockClients() throws Exception {
-        concurrentUpdateNoDeadlock(clients(), 20, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception {
-        concurrentUpdateNoDeadlock(clients(), 20, true);
-    }
-
-    /**
-     * @return Client nodes.
-     */
-    private List<Ignite> clients() {
-        List<Ignite> clients = new ArrayList<>();
-
-        for (int i = 0; i < CLIENTS; i++) {
-            Ignite ignite = ignite(SRVS + i);
-
-            assertTrue(ignite.configuration().isClientMode());
-
-            clients.add(ignite);
-        }
-
-        return clients;
-    }
-
-    /**
-     * @param updateNodes Nodes executing updates.
-     * @param threads Number of threads executing updates.
-     * @param restart If {@code true} restarts one node.
-     * @throws Exception If failed.
-     */
-    private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
-        int threads,
-        final boolean restart) throws Exception {
-        assert updateNodes.size() > 0;
-
-        final Ignite ignite0 = ignite(0);
-
-        final String cacheName =
-            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
-
-        final int KEYS = 100;
-
-        final AtomicBoolean finished = new AtomicBoolean();
-
-        IgniteInternalFuture<Object> fut = null;
-
-        try {
-            if (restart) {
-                fut = GridTestUtils.runAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!finished.get()) {
-                            stopGrid(0);
-
-                            U.sleep(300);
-
-                            Ignite ignite = startGrid(0);
-
-                            assertFalse(ignite.configuration().isClientMode());
-                        }
-
-                        return null;
-                    }
-                });
-            }
-
-            for (int i = 0; i < 10; i++) {
-                log.info("Iteration: " + i);
-
-                final long stopTime = U.currentTimeMillis() + 5_000;
-
-                final AtomicInteger idx = new AtomicInteger();
-
-                IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        int nodeIdx = idx.getAndIncrement() % updateNodes.size();
-
-                        Ignite node = updateNodes.get(nodeIdx);
-
-                        log.info("Tx thread: " + node.name());
-
-                        final IgniteTransactions txs = node.transactions();
-
-                        final IgniteCache<Integer, Integer> cache = node.cache(cacheName);
-
-                        assertNotNull(cache);
-
-                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-                        while (U.currentTimeMillis() < stopTime) {
-                            final Map<Integer, Integer> keys = new LinkedHashMap<>();
-
-                            for (int i = 0; i < KEYS / 2; i++)
-                                keys.put(rnd.nextInt(KEYS), rnd.nextInt());
-
-                            try {
-                                if (restart) {
-                                    doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() {
-                                        @Override public Void call() throws Exception {
-                                            cache.putAll(keys);
-
-                                            return null;
-                                        }
-                                    });
-                                }
-                                else {
-                                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                                        cache.putAll(keys);
-
-                                        tx.commit();
-                                    }
-                                }
-                            }
-                            catch (TransactionOptimisticException ignore) {
-                                // No-op.
-                            }
-                            catch (Throwable e) {
-                                log.error("Unexpected error: " + e, e);
-
-                                throw e;
-                            }
-                        }
-
-                        return null;
-                    }
-                }, threads, "tx-thread");
-
-                updateFut.get(60, SECONDS);
-
-                IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName);
-
-                for (int key = 0; key < KEYS; key++) {
-                    Integer val = cache.get(key);
-
-                    for (int node = 1; node < SRVS + CLIENTS; node++)
-                        assertEquals(val, ignite(node).cache(cache.getName()).get(key));
-                }
-            }
-
-            finished.set(true);
-
-            if (fut != null)
-                fut.get();
-        }
-        finally {
-            finished.set(true);
-        }
-    }
-
-    /**
-     * @return Cache configurations.
-     */
-    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
-        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
-
-        // No store, no near.
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false));
-
-        // Store, no near.
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false));
-
-        // No store, near.
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true));
-
-        // Store, near.
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true));
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true));
-
-        return ccfgs;
-    }
-
-    /**
-     * @param ccfg Cache configuration.
-     */
-    private void logCacheInfo(CacheConfiguration<?, ?> ccfg) {
-        log.info("Test cache [mode=" + ccfg.getCacheMode() +
-            ", sync=" + ccfg.getWriteSynchronizationMode() +
-            ", backups=" + ccfg.getBackups() +
-            ", near=" + (ccfg.getNearConfiguration() != null) +
-            ", store=" + ccfg.isWriteThrough() + ']');
-    }
-
-    /**
-     * @param cache Cache.
-     * @param key Key.
-     * @param val Value.
-     * @throws Exception If failed.
-     */
-    private void updateKey(
-        final IgniteCache<Integer, Integer> cache,
-        final Integer key,
-        final Integer val) throws Exception {
-        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
-
-                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    cache.put(key, val);
-
-                    tx.commit();
-                }
-
-                return null;
-            }
-        }, "update-thread");
-
-        fut.get();
-    }
-
-    /**
-     * @param releaseLatch Release lock latch.
-     * @param cache Cache.
-     * @param key Key.
-     * @return Future.
-     * @throws Exception If failed.
-     */
-    private IgniteInternalFuture<?> lockKey(
-        final CountDownLatch releaseLatch,
-        final IgniteCache<Integer, Integer> cache,
-        final Integer key) throws Exception {
-        final CountDownLatch lockLatch = new CountDownLatch(1);
-
-        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
-
-                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    cache.put(key, 1);
-
-                    log.info("Locked key: " + key);
-
-                    lockLatch.countDown();
-
-                    assertTrue(releaseLatch.await(100000, SECONDS));
-
-                    log.info("Commit tx: " + key);
-
-                    tx.commit();
-                }
-
-                return null;
-            }
-        }, "lock-thread");
-
-        assertTrue(lockLatch.await(10, SECONDS));
-
-        return fut;
-    }
-
-    /**
-     * @param cacheMode Cache mode.
-     * @param syncMode Write synchronization mode.
-     * @param backups Number of backups.
-     * @param storeEnabled If {@code true} adds cache store.
-     * @param nearCache If {@code true} near cache is enabled.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<Integer, Integer> cacheConfiguration(
-        CacheMode cacheMode,
-        CacheWriteSynchronizationMode syncMode,
-        int backups,
-        boolean storeEnabled,
-        boolean nearCache) {
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
-
-        ccfg.setCacheMode(cacheMode);
-        ccfg.setAtomicityMode(TRANSACTIONAL);
-        ccfg.setBackups(backups);
-        ccfg.setWriteSynchronizationMode(syncMode);
-
-        if (storeEnabled) {
-            ccfg.setCacheStoreFactory(new TestStoreFactory());
-            ccfg.setWriteThrough(true);
-        }
-
-        if (nearCache)
-            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
-
-        return ccfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {
-        /** {@inheritDoc} */
-        @Override public CacheStore<Integer, Integer> create() {
-            return new CacheStoreAdapter<Integer, Integer>() {
-                @Override public Integer load(Integer key) throws CacheLoaderException {
-                    return null;
-                }
-
-                @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
-                    // No-op.
-                }
-
-                @Override public void delete(Object key) {
-                    // No-op.
-                }
-            };
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 2d29c49..3cdca6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -93,6 +93,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  *
@@ -862,6 +863,150 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testOptimisticSerializableTx() throws Exception {
+        optimisticSerializableTx(null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSerializableTxNearEnabled() throws Exception {
+        optimisticSerializableTx(new NearCacheConfiguration());
+    }
+
+    /**
+     * @param nearCfg Near cache configuration.
+     * @throws Exception If failed.
+     */
+    private void optimisticSerializableTx(NearCacheConfiguration nearCfg) throws Exception {
+        ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+        ccfg.setNearConfiguration(nearCfg);
+
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        client = true;
+
+        final Ignite ignite2 = startGrid(2);
+
+        assertTrue(ignite2.configuration().isClientMode());
+
+        final Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, i);
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+        spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id());
+        spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id());
+
+        spi.record(GridNearTxPrepareRequest.class);
+
+        final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+        IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Thread.currentThread().setName("put-thread");
+
+                try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.putAll(map);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        assertFalse(putFut.isDone());
+
+        client = false;
+
+        IgniteEx ignite3 = startGrid(3);
+
+        log.info("Stop block1.");
+
+        spi.stopBlock();
+
+        putFut.get();
+
+        spi.record(null);
+
+        checkData(map, null, cache, 4);
+
+        List<Object> msgs = spi.recordedMessages();
+
+        for (Object msg : msgs)
+            assertTrue(((GridNearTxPrepareRequest)msg).firstClientRequest());
+
+        assertEquals(5, msgs.size());
+
+        ignite3.close();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, i + 1);
+
+        spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id());
+        spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id());
+
+        spi.record(GridNearTxPrepareRequest.class);
+
+        putFut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Thread.currentThread().setName("put-thread");
+
+                try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    for (Map.Entry<Integer, Integer> e : map.entrySet())
+                        cache.put(e.getKey(), e.getValue());
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        ignite3 = startGrid(3);
+
+        log.info("Stop block2.");
+
+        spi.stopBlock();
+
+        putFut.get();
+
+        spi.record(null);
+
+        msgs = spi.recordedMessages();
+
+        for (Object msg : msgs)
+            assertTrue(((GridNearTxPrepareRequest)msg).firstClientRequest());
+
+        assertEquals(5, msgs.size());
+
+        checkData(map, null, cache, 4);
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, i + 2);
+
+        try (Transaction tx = ignite2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+            cache.putAll(map);
+
+            tx.commit();
+        }
+
+        checkData(map, null, cache, 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testLock() throws Exception {
         lock(null);
     }
@@ -1816,6 +1961,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
                     super.sendMessage(msg.get1(), msg.get2());
                 }
+
+                blockedMsgs.clear();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/920d7472/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index b89bffd..dbf8928 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheReadThroughReplicatedAto
 import org.apache.ignite.internal.processors.cache.CacheReadThroughReplicatedRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReadThroughRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheRemoveAllSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
 import org.apache.ignite.internal.processors.cache.CacheStopAndDestroySelfTest;
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynamicStartAtomicTest;
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynamicStartTxTest;
@@ -139,6 +140,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("IgniteCache Test Suite part 4");
 
+        suite.addTestSuite(CacheSerializableTransactionsTest.class);
+
         // Multi node update.
         suite.addTestSuite(GridCacheMultinodeUpdateSelfTest.class);
         suite.addTestSuite(GridCacheMultinodeUpdateNearEnabledSelfTest.class);


Mime
View raw message