ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [13/43] ignite git commit: ignite-2407 Fixed 'primary_sync' mode for transactional cache
Date Tue, 12 Apr 2016 00:08:35 GMT
ignite-2407 Fixed 'primary_sync' mode for transactional cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1af2c7b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1af2c7b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1af2c7b

Branch: refs/heads/ignite-2949
Commit: f1af2c7b077b5483ff2b68a6e76775516518c598
Parents: e7e223f
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Apr 5 15:26:13 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Apr 5 15:26:13 2016 +0300

----------------------------------------------------------------------
 .../ClientAbstractMultiNodeSelfTest.java        |    8 +-
 .../GridDistributedTxFinishRequest.java         |    2 +
 .../dht/GridDhtTransactionalCacheAdapter.java   |    4 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   11 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  204 ++--
 .../distributed/dht/GridDhtTxPrepareFuture.java |    6 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |    5 -
 .../colocated/GridDhtColocatedLockFuture.java   |   17 +-
 .../distributed/near/GridNearLockFuture.java    |   10 +-
 .../near/GridNearTxFinishFuture.java            |   81 +-
 .../near/GridNearTxFinishRequest.java           |   46 +-
 .../cache/distributed/near/GridNearTxLocal.java |   24 -
 .../distributed/near/GridNearTxRemote.java      |    5 -
 .../cache/transactions/IgniteInternalTx.java    |   30 +-
 .../cache/transactions/IgniteTxAdapter.java     |   68 +-
 .../cache/transactions/IgniteTxHandler.java     |   17 +-
 .../IgniteTxImplicitSingleStateImpl.java        |    7 +-
 .../IgniteTxRemoteStateAdapter.java             |    7 +-
 .../cache/transactions/IgniteTxState.java       |    5 +-
 .../cache/transactions/IgniteTxStateImpl.java   |   28 +-
 .../internal/TestRecordingCommunicationSpi.java |   63 +-
 .../cache/IgniteCacheNearLockValueSelfTest.java |    2 +-
 .../distributed/IgniteCacheGetRestartTest.java  |    4 +
 .../distributed/IgniteCachePrimarySyncTest.java |   45 +-
 .../IgniteCacheReadFromBackupTest.java          |   12 +-
 .../IgniteCacheSingleGetMessageTest.java        |    8 +-
 .../IgniteTxCachePrimarySyncTest.java           | 1114 ++++++++++++++++++
 ...teSynchronizationModesMultithreadedTest.java |  422 +++++++
 .../GridCacheDhtPreloadMessageCountTest.java    |    6 +-
 .../dht/GridCacheTxNodeFailureSelfTest.java     |   10 +-
 .../testsuites/IgniteCacheTestSuite4.java       |    6 +
 31 files changed, 1910 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
index 5e5a68d..80e7baa 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
@@ -518,13 +518,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
             IgniteInternalTx t = tm.tx(v);
 
             if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1"))))
-                assertFalse("Invalid tx flags: " + t, t.syncCommit());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2"))))
-            assertTrue("Invalid tx flags: " + t, t.syncCommit());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3"))))
-                assertFalse("Invalid tx flags: " + t, t.syncCommit());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4"))))
-                assertTrue("Invalid tx flags: " + t, t.syncCommit());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index a761fec..ad69d14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -50,9 +50,11 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
     private boolean commit;
 
     /** Sync commit flag. */
+    @Deprecated
     private boolean syncCommit;
 
     /** Sync commit flag. */
+    @Deprecated
     private boolean syncRollback;
 
     /** Min version used as base for completed versions. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b6639f6..f19980b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -889,7 +890,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             req.subjectId(),
                             req.taskNameHash());
 
-                        tx.syncCommit(req.syncCommit());
+                        if (req.syncCommit())
+                            tx.syncMode(FULL_SYNC);
 
                         tx = ctx.tm().onCreated(null, tx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index b9afbed..ebda52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -46,6 +45,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 
 /**
@@ -217,8 +218,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 if (finishErr == null)
                     finishErr = this.tx.commitError();
 
-                // Always send finish reply.
-                this.tx.sendFinishReply(commit, finishErr);
+                if (tx.syncMode() != PRIMARY_SYNC)
+                    this.tx.sendFinishReply(commit, finishErr);
 
                 // Don't forget to clean up.
                 cctx.mvcc().removeFuture(futId);
@@ -277,7 +278,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
         if (tx.onePhaseCommit())
             return false;
 
-        boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
+        boolean sync = tx.syncMode() == FULL_SYNC;
 
         if (tx.explicitLock())
             sync = true;
@@ -346,7 +347,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
         if (tx.onePhaseCommit())
             return false;
 
-        boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
+        boolean sync = tx.syncMode() == FULL_SYNC;
 
         if (tx.explicitLock())
             sync = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index acd5017..d1f88d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -54,6 +53,7 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
@@ -488,6 +488,51 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         return chainOnePhasePrepare(fut);
     }
 
+    /**
+     * @param prepFut Prepare future.
+     * @param fut Finish future.
+     */
+    private void finishCommit(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) {
+        boolean primarySync = syncMode() == PRIMARY_SYNC;
+
+        IgniteCheckedException err = null;
+
+        try {
+            if (prepFut != null)
+                prepFut.get(); // Check for errors.
+
+            if (finish(true)) {
+                if (primarySync)
+                    sendFinishReply(true, null);
+
+                fut.finish();
+            }
+            else {
+                err = new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this));
+
+                fut.onError(err);
+            }
+        }
+        catch (IgniteTxOptimisticCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']');
+
+            err = e;
+
+            fut.onError(e);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to prepare transaction: " + this, e);
+
+            err = e;
+
+            fut.onError(e);
+        }
+
+        if (primarySync && err != null)
+            sendFinishReply(true, err);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
     @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
@@ -505,73 +550,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         GridDhtTxPrepareFuture prep = prepFut;
 
         if (prep != null) {
-            if (prep.isDone()) {
-                try {
-                    prep.get(); // Check for errors of a parent future.
-
-                    if (finish(true))
-                        fut.finish();
-                    else
-                        fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)));
-                }
-                catch (IgniteTxOptimisticCheckedException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']');
-
-                    fut.onError(e);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to prepare transaction: " + this, e);
-
-                    fut.onError(e);
-                }
-            }
-            else
+            if (prep.isDone())
+                finishCommit(prep, fut);
+            else {
                 prep.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> f) {
-                        try {
-                            f.get(); // Check for errors of a parent future.
-
-                            if (finish(true))
-                                fut.finish();
-                            else
-                                fut.onError(new IgniteCheckedException("Failed to commit transaction: " +
-                                    CU.txString(GridDhtTxLocal.this)));
-                        }
-                        catch (IgniteTxOptimisticCheckedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']');
-
-                            fut.onError(e);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to prepare transaction: " + this, e);
-
-                            fut.onError(e);
-                        }
+                        finishCommit(f, fut);
                     }
                 });
+            }
         }
         else {
             assert optimistic();
 
-            try {
-                if (finish(true))
-                    fut.finish();
-                else
-                    fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)));
-            }
-            catch (IgniteTxOptimisticCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']');
-
-                fut.onError(e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to commit transaction: " + this, e);
-
-                fut.onError(e);
-            }
+            finishCommit(null, fut);
         }
 
         return fut;
@@ -584,64 +576,70 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         PREP_FUT_UPD.compareAndSet(this, fut, null);
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
-        GridDhtTxPrepareFuture prepFut = this.prepFut;
+    /**
+     * @param prepFut Prepare future.
+     * @param fut Finish future.
+     */
+    private void finishRollback(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) {
+        try {
+            if (prepFut != null)
+                prepFut.get();
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']');
+        }
 
-        final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
+        boolean primarySync = syncMode() == PRIMARY_SYNC;
 
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        IgniteCheckedException err = null;
 
-        if (prepFut == null) {
-            try {
-                if (finish(false) || state() == UNKNOWN)
-                    fut.finish();
-                else
-                    fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this)));
-            }
-            catch (IgniteTxOptimisticCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']');
+        try {
+            if (finish(false) || state() == UNKNOWN) {
+                if (primarySync)
+                    sendFinishReply(false, null);
 
-                fut.onError(e);
+                fut.finish();
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to rollback transaction (will make the best effort to rollback remote nodes): " +
-                    this, e);
+            else {
+                err = new IgniteCheckedException("Failed to rollback transaction: " +
+                    CU.txString(GridDhtTxLocal.this));
 
-                fut.onError(e);
+                fut.onError(err);
             }
         }
-        else {
-            prepFut.complete();
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this),
+                e);
 
-            prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    try {
-                        f.get(); // Check for errors of a parent future.
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']');
-                    }
+            err = e;
 
-                    try {
-                        if (finish(false) || state() == UNKNOWN)
-                            fut.finish();
-                        else
-                            fut.onError(new IgniteCheckedException("Failed to rollback transaction: " +
-                                CU.txString(GridDhtTxLocal.this)));
+            fut.onError(e);
+        }
 
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this),
-                            e);
+        if (primarySync && err != null)
+            sendFinishReply(false, err);
+    }
 
-                        fut.onError(e);
-                    }
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+        GridDhtTxPrepareFuture prepFut = this.prepFut;
+
+        final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
+
+        cctx.mvcc().addFuture(fut, fut.futureId());
+
+        if (prepFut != null) {
+            prepFut.complete();
+
+            prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
+                    finishRollback(f, fut);
                 }
             });
         }
+        else
+            finishRollback(null, fut);
 
         return fut;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 0541c8a..df0068a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -647,7 +647,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
                     IgniteInternalFuture<IgniteInternalTx> fut = null;
 
-                    CIX1<IgniteInternalFuture<IgniteInternalTx>> responseClo =
+                    CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
                         new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                             @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
                                 try {
@@ -674,7 +674,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                             fut = tx.rollbackAsync();
 
-                            fut.listen(responseClo);
+                            fut.listen(resClo);
 
                             throw e;
                         }
@@ -684,7 +684,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         fut = tx.rollbackAsync();
 
                     if (fut != null)
-                        fut.listen(responseClo);
+                        fut.listen(resClo);
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f509e27..dc27eb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -247,11 +247,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean enforceSerializable() {
-        return false; // Serializable will be enforced on primary mode.
-    }
-
-    /** {@inheritDoc} */
     @Override public GridCacheVersion nearXidVersion() {
         return nearXidVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e4c6b71..5810028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -70,6 +70,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 
 /**
@@ -253,20 +254,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     }
 
     /**
-     * @return {@code True} if commit is synchronous.
-     */
-    private boolean syncCommit() {
-        return tx != null && tx.syncCommit();
-    }
-
-    /**
-     * @return {@code True} if rollback is synchronous.
-     */
-    private boolean syncRollback() {
-        return tx != null && tx.syncRollback();
-    }
-
-    /**
      * @return Transaction isolation or {@code null} if no transaction.
      */
     @Nullable private TransactionIsolation isolation() {
@@ -897,7 +884,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                         timeout,
                                         mappedKeys.size(),
                                         inTx() ? tx.size() : mappedKeys.size(),
-                                        inTx() && tx.syncCommit(),
+                                        inTx() && tx.syncMode() == FULL_SYNC,
                                         inTx() ? tx.subjectId() : null,
                                         inTx() ? tx.taskNameHash() : 0,
                                         read ? accessTtl : -1L,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 5d4fc01..0d17bd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -68,6 +68,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 
 /**
@@ -265,13 +266,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     }
 
     /**
-     * @return {@code True} if rollback is synchronous.
-     */
-    private boolean syncRollback() {
-        return tx != null && tx.syncRollback();
-    }
-
-    /**
      * @return Transaction isolation or {@code null} if no transaction.
      */
     @Nullable private TransactionIsolation isolation() {
@@ -1013,7 +1007,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                                 timeout,
                                                 mappedKeys.size(),
                                                 inTx() ? tx.size() : mappedKeys.size(),
-                                                inTx() && tx.syncCommit(),
+                                                inTx() && tx.syncMode() == FULL_SYNC,
                                                 inTx() ? tx.subjectId() : null,
                                                 inTx() ? tx.taskNameHash() : 0,
                                                 read ? accessTtl : -1L,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 5c4aca0..fe6290d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -55,6 +56,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
@@ -70,6 +73,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.1");
 
     /** */
+    public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -120,6 +126,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridNearTxFinishFuture.class);
+
+        CacheWriteSynchronizationMode syncMode;
+
+        if (tx.explicitLock())
+            syncMode = FULL_SYNC;
+        else
+            syncMode = tx.syncMode();
+
+        tx.syncMode(syncMode);
     }
 
     /** {@inheritDoc} */
@@ -322,20 +337,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     * Completeness callback.
-     */
-    private void onComplete() {
-        onDone(tx);
-    }
-
-    /**
-     * @return Synchronous flag.
-     */
-    private boolean isSync() {
-        return tx.explicitLock() || (commit ? tx.syncCommit() : tx.syncRollback());
-    }
-
-    /**
      * Initializes future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -366,26 +367,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 }
 
                 markInitialized();
-
-                if (!isSync() && !isDone()) {
-                    boolean complete = true;
-
-                    synchronized (futs) {
-                        // Avoid collection copy and iterator creation.
-                        for (int i = 0; i < futs.size(); i++) {
-                            IgniteInternalFuture<IgniteInternalTx> f = futs.get(i);
-
-                            if (isMini(f) && !f.isDone()) {
-                                complete = false;
-
-                                break;
-                            }
-                        }
-                    }
-
-                    if (complete)
-                        onComplete();
-                }
             }
             else
                 onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
@@ -441,7 +422,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         readyNearMappingFromBackup(mapping);
 
                         if (committed) {
-                            if (tx.syncCommit()) {
+                            if (tx.syncMode() == FULL_SYNC) {
                                 GridCacheVersion nearXidVer = tx.nearXidVersion();
 
                                 assert nearXidVer != null : tx;
@@ -511,6 +492,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         if (finish) {
             GridDistributedTxMapping mapping = tx.mappings().singleMapping();
 
+            assert mapping != null : tx;
+
             if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0)
                 finish = false;
         }
@@ -575,6 +558,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         assert !m.empty();
 
+        CacheWriteSynchronizationMode syncMode = tx.syncMode();
+
+        if (m.explicitLock())
+            syncMode = FULL_SYNC;
+
         GridNearTxFinishRequest req = new GridNearTxFinishRequest(
             futId,
             tx.xidVersion(),
@@ -583,8 +571,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.isInvalidate(),
             tx.system(),
             tx.ioPolicy(),
-            tx.syncCommit(),
-            tx.syncRollback(),
+            syncMode,
             m.explicitLock(),
             tx.storeEnabled(),
             tx.topologyVersion(),
@@ -604,7 +591,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finish(n.id(), tx, req);
 
             // Add new future.
-            if (fut != null)
+            if (fut != null && syncMode == FULL_SYNC)
                 add(fut);
         }
         else {
@@ -620,8 +607,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
 
+                boolean wait;
+
+                if (syncMode == PRIMARY_SYNC)
+                    wait = n.version().compareToIgnoreTimestamp(PRIMARY_SYNC_TXS_SINCE) >= 0;
+                else
+                    wait = syncMode == FULL_SYNC;
+
                 // If we don't wait for result, then mark future as done.
-                if (!isSync() && !m.explicitLock())
+                if (!wait)
                     fut.onDone();
             }
             catch (ClusterTopologyCheckedException e) {
@@ -665,9 +659,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                             ", loc=" + node.isLocal() +
                             ", done=" + f.isDone() + "]";
                     }
-                    else {
+                    else
                         return "CheckBackupFuture[node=null, done=" + f.isDone() + "]";
-                    }
                 }
                 else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
                     CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
@@ -703,8 +696,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.system(),
             tx.ioPolicy(),
             false,
-            tx.syncCommit(),
-            tx.syncRollback(),
+            tx.syncMode() == FULL_SYNC,
+            tx.syncMode() == FULL_SYNC,
             null,
             null,
             null,
@@ -780,7 +773,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 if (log.isDebugEnabled())
                     log.debug("Remote node left grid while sending or waiting for reply: " + this);
 
-                if (isSync()) {
+                if (tx.syncMode() == FULL_SYNC) {
                     Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
 
                     if (txNodes != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 65eac63..dfbbe18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -56,6 +57,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Task name hash. */
     private int taskNameHash;
 
+    /** Write synchronization mode. */
+    private CacheWriteSynchronizationMode syncMode;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -71,8 +75,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param invalidate Invalidate flag.
      * @param sys System flag.
      * @param plc IO policy.
-     * @param syncCommit Sync commit flag.
-     * @param syncRollback Sync rollback flag.
+     * @param syncMode Write synchronization mode.
      * @param explicitLock Explicit lock flag.
      * @param storeEnabled Store enabled flag.
      * @param topVer Topology version.
@@ -92,8 +95,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean invalidate,
         boolean sys,
         byte plc,
-        boolean syncCommit,
-        boolean syncRollback,
+        CacheWriteSynchronizationMode syncMode,
         boolean explicitLock,
         boolean storeEnabled,
         @NotNull AffinityTopologyVersion topVer,
@@ -113,8 +115,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
             invalidate,
             sys,
             plc,
-            syncCommit,
-            syncRollback,
+            syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+            syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
             baseVer,
             committedVers,
             rolledbackVers,
@@ -122,6 +124,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
             addDepInfo
         );
 
+        this.syncMode = syncMode;
         this.explicitLock = explicitLock;
         this.storeEnabled = storeEnabled;
         this.topVer = topVer;
@@ -130,6 +133,13 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
+     * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+     */
+    @Nullable public CacheWriteSynchronizationMode syncMode() {
+        return syncMode;
+    }
+
+    /**
      * @return Explicit lock flag.
      */
     public boolean explicitLock() {
@@ -218,12 +228,18 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 23:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -278,14 +294,26 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 22:
-                taskNameHash = reader.readInt("taskNameHash");
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
 
                 if (!reader.isLastRead())
                     return false;
 
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
                 reader.incrementState();
 
             case 23:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 24:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -305,7 +333,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f7c330e..4aee6ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -207,11 +207,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean enforceSerializable() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override protected UUID nearNodeId() {
         return cctx.localNodeId();
     }
@@ -244,16 +239,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         PREP_FUT_UPD.compareAndSet(this, fut, null);
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean syncCommit() {
-        return sync();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean syncRollback() {
-        return sync();
-    }
-
     /**
      * Marks transaction to check if commit on backup.
      */
@@ -284,15 +269,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
-     * Checks if transaction is fully synchronous.
-     *
-     * @return {@code True} if transaction is fully synchronous.
-     */
-    private boolean sync() {
-        return super.syncCommit() || txState().sync(cctx);
-    }
-
-    /**
      * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
      */
     public boolean nearLocallyMapped() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 6b17d5e..4f4be57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -227,11 +227,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean enforceSerializable() {
-        return false; // Serializable will be enforced on primary mode.
-    }
-
-    /** {@inheritDoc} */
     @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
         return owned == null ? null : owned.get(key);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index cdf2354..e08f9b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -386,15 +387,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public boolean ownsLockUnsafe(GridCacheEntryEx entry);
 
     /**
-     * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR
-     * transactions because serializability of transaction is enforced on primary node. All
-     * other transaction types must enforce it.
-     *
-     * @return Enforce serializable flag.
-     */
-    public boolean enforceSerializable();
-
-    /**
      * @return {@code True} if near transaction.
      */
     public boolean near();
@@ -442,14 +434,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public boolean user();
 
     /**
-     * @return {@code True} if transaction is configured with synchronous commit flag.
-     */
-    public boolean syncCommit();
-
-    /**
-     * @return {@code True} if transaction is configured with synchronous rollback flag.
+     * @return Transaction write synchronization mode.
      */
-    public boolean syncRollback();
+    public CacheWriteSynchronizationMode syncMode();
 
     /**
      * @param key Key to check.
@@ -524,11 +511,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
          KeyCacheObject key) throws GridCacheFilterFailedException;
 
     /**
-     * @return Start version.
-     */
-    public GridCacheVersion startVersion();
-
-    /**
      * @return Transaction version.
      */
     public GridCacheVersion xidVersion();
@@ -544,12 +526,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public void commitVersion(GridCacheVersion commitVer);
 
     /**
-     * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
-     *      assigned to this transaction at the end of write phase.
-     */
-    public GridCacheVersion endVersion();
-
-    /**
      * Prepare state.
      *
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 77f3765..50598c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -36,6 +36,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -184,10 +185,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected boolean onePhaseCommit;
 
     /** */
-    protected boolean syncCommit;
-
-    /** */
-    protected boolean syncRollback;
+    protected CacheWriteSynchronizationMode syncMode;
 
     /** If this transaction contains transform entries. */
     protected boolean transform;
@@ -638,32 +636,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public boolean enforceSerializable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean syncCommit() {
-        return syncCommit;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean syncRollback() {
-        return syncRollback;
-    }
+    @Override public CacheWriteSynchronizationMode syncMode() {
+        if (syncMode != null)
+            return syncMode;
 
-    /**
-     * @param syncCommit Synchronous commit flag.
-     */
-    public void syncCommit(boolean syncCommit) {
-        this.syncCommit = syncCommit;
+        return txState().syncMode(cctx);
     }
 
     /**
-     * @param syncRollback Synchronous rollback flag.
+     * @param syncMode Write synchronization mode.
      */
-    public void syncRollback(boolean syncRollback) {
-        this.syncRollback = syncRollback;
+    public void syncMode(CacheWriteSynchronizationMode syncMode) {
+        this.syncMode = syncMode;
     }
 
     /** {@inheritDoc} */
@@ -1154,16 +1138,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion startVersion() {
-        return startVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion endVersion() {
-        return endVer;
-    }
-
-    /** {@inheritDoc} */
     @Override public void endVersion(GridCacheVersion endVer) {
         this.endVer = endVer;
     }
@@ -1897,11 +1871,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean enforceSerializable() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean near() {
             return false;
         }
@@ -1942,13 +1911,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean syncCommit() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean syncRollback() {
-            return false;
+        @Override public CacheWriteSynchronizationMode syncMode() {
+            return null;
         }
 
         /** {@inheritDoc} */
@@ -2014,11 +1978,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public GridCacheVersion startVersion() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public GridCacheVersion xidVersion() {
             return null;
         }
@@ -2034,11 +1993,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public GridCacheVersion endVersion() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public void prepare() throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 41dc43f..a764d5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -72,6 +72,8 @@ import org.apache.ignite.lang.IgniteFutureCancelledException;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
@@ -389,7 +391,7 @@ public class IgniteTxHandler {
 
         if (tx != null) {
             if (req.explicitLock())
-                tx.explicitLock(req.explicitLock());
+                tx.explicitLock(true);
 
             tx.transactionNodes(req.transactionNodes());
 
@@ -688,6 +690,14 @@ public class IgniteTxHandler {
             assert tx != null : "Transaction is null for near finish request [nodeId=" +
                 nodeId + ", req=" + req + "]";
 
+            if (req.syncMode() == null) {
+                boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
+
+                tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
+            }
+            else
+                tx.syncMode(req.syncMode());
+
             if (req.commit()) {
                 tx.storeEnabled(req.storeEnabled());
 
@@ -698,9 +708,6 @@ public class IgniteTxHandler {
                     return null;
                 }
 
-                if (!tx.syncCommit())
-                    tx.syncCommit(req.syncCommit());
-
                 tx.nearFinishFutureId(req.futureId());
                 tx.nearFinishMiniId(req.miniId());
 
@@ -712,8 +719,6 @@ public class IgniteTxHandler {
                 return commitFut;
             }
             else {
-                tx.syncRollback(req.syncRollback());
-
                 tx.nearFinishFutureId(req.futureId());
                 tx.nearFinishMiniId(req.miniId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 2f1e16f..965ef2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -36,7 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 
 /**
  *
@@ -105,8 +106,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sync(GridCacheSharedContext cctx) {
-        return cacheCtx != null && cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC;
+    @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) {
+        return cacheCtx != null ? cacheCtx.config().getWriteSynchronizationMode() : FULL_ASYNC;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 3e5034b..79b4a74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -26,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+
 /**
  *
  */
@@ -55,10 +58,10 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sync(GridCacheSharedContext cctx) {
+    @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) {
         assert false;
 
-        return false;
+        return FULL_ASYNC;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 18fce8d..b133533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -63,9 +64,9 @@ public interface IgniteTxState {
 
     /**
      * @param cctx Context.
-     * @return {@code True} if transaction is fully synchronous.
+     * @return Write synchronization mode.
      */
-    public boolean sync(GridCacheSharedContext cctx);
+    public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx);
 
     /**
      * @param cctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 1256aa6..c826de1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -39,7 +40,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -134,15 +137,32 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sync(GridCacheSharedContext cctx) {
+    @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) {
+        CacheWriteSynchronizationMode syncMode = CacheWriteSynchronizationMode.FULL_ASYNC;
+
         for (int i = 0; i < activeCacheIds.size(); i++) {
             int cacheId = (int)activeCacheIds.get(i);
 
-            if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
-                return true;
+            CacheWriteSynchronizationMode cacheSyncMode =
+                cctx.cacheContext(cacheId).config().getWriteSynchronizationMode();
+
+            switch (cacheSyncMode) {
+                case FULL_SYNC:
+                    return FULL_SYNC;
+
+                case PRIMARY_SYNC: {
+                    if (syncMode == FULL_ASYNC)
+                        syncMode = PRIMARY_SYNC;
+
+                    break;
+                }
+
+                case FULL_ASYNC:
+                    break;
+            }
         }
 
-        return false;
+        return syncMode;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 307a470..2aed459 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,7 +34,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
 
@@ -42,7 +42,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
  */
 public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     /** */
-    private Class<?> recordCls;
+    private Set<Class<?>> recordClasses;
 
     /** */
     private List<Object> recordedMsgs = new ArrayList<>();
@@ -65,7 +65,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
             Object msg0 = ioMsg.message();
 
             synchronized (this) {
-                if (recordCls != null && msg0.getClass().equals(recordCls))
+                if (recordClasses != null && recordClasses.contains(msg0.getClass()))
                     recordedMsgs.add(msg0);
 
                 boolean block = false;
@@ -97,28 +97,46 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
-     * @param recordCls Message class to record.
+     * @param recordClasses Message classes to record.
      */
-    public void record(@Nullable Class<?> recordCls) {
+    public void record(Class<?>... recordClasses) {
         synchronized (this) {
-            this.recordCls = recordCls;
+            if (this.recordClasses == null)
+                this.recordClasses = new HashSet<>();
+
+            Collections.addAll(this.recordClasses, recordClasses);
+
+            recordedMsgs = new ArrayList<>();
         }
     }
 
     /**
+     * @param stopRecord Stop record flag.
      * @return Recorded messages.
      */
-    public List<Object> recordedMessages() {
+    public List<Object> recordedMessages(boolean stopRecord) {
         synchronized (this) {
             List<Object> msgs = recordedMsgs;
 
             recordedMsgs = new ArrayList<>();
 
+            if (stopRecord)
+                recordClasses = null;
+
             return msgs;
         }
     }
 
     /**
+     * @return {@code True} if there are blocked messages.
+     */
+    public boolean hasBlockedMessages() {
+        synchronized (this) {
+            return !blockedMsgs.isEmpty();
+        }
+    }
+
+    /**
      * @param blockP Message block predicate.
      */
     public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
@@ -146,22 +164,35 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
-     * Stops block messages and sends all already blocked messages.
+     * Stops block messages and can sends all already blocked messages.
      */
     public void stopBlock() {
+        stopBlock(true);
+    }
+
+    /**
+     * Stops block messages and sends all already blocked messages if sndMsgs is 'true'.
+     *
+     * @param sndMsgs If {@code true} sends blocked messages.
+     */
+    public void stopBlock(boolean sndMsgs) {
         synchronized (this) {
+            blockP = null;
+
             blockCls.clear();
             blockP = null;
 
-            for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
-                try {
-                    ignite.log().info("Send blocked message [node=" + msg.get1().id() +
-                        ", msg=" + msg.get2().message() + ']');
+            if (sndMsgs) {
+                for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                    try {
+                        ignite.log().info("Send blocked message [node=" + msg.get1().id() +
+                            ", msg=" + msg.get2().message() + ']');
 
-                    super.sendMessage(msg.get1(), msg.get2());
-                }
-                catch (Throwable e) {
-                    U.error(ignite.log(), "Failed to send blocked message: " + msg, e);
+                        super.sendMessage(msg.get1(), msg.get2());
+                    }
+                    catch (Throwable e) {
+                        U.error(ignite.log(), "Failed to send blocked message: " + msg, e);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
index f106fec..a35d5a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -94,7 +94,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
                 TestRecordingCommunicationSpi comm =
                     (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
 
-                Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages();
+                Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages(false);
 
                 assertEquals(1, reqs.size());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
index b14109b..71d1182 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
@@ -160,6 +161,8 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
                 ignite(SRVS).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
 
             try (IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(ccfg.getName())) {
+                streamer.allowOverwrite(true);
+
                 for (int i = 0; i < KEYS; i++)
                     streamer.addData(i, i);
             }
@@ -274,6 +277,7 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
             ccfg.setNearConfiguration(new NearCacheConfiguration<>());
 
         ccfg.setRebalanceMode(ASYNC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         return ccfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
index cef73fd..183d4bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
@@ -21,16 +21,26 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  *
@@ -94,21 +104,36 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPutGet() throws Exception {
-        checkPutGet(ignite(SRVS).cache("cache1"));
+        Ignite ignite = ignite(SRVS);
 
-        checkPutGet(ignite(SRVS).cache("cache2"));
+        checkPutGet(ignite.cache("cache1"), null, null, null);
+
+        checkPutGet(ignite.cache("cache2"), null, null, null);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, REPEATABLE_READ);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, SERIALIZABLE);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), PESSIMISTIC, READ_COMMITTED);
     }
 
     /**
      * @param cache Cache.
+     * @param txs Transactions instance if explicit transaction should be used.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
      */
-    private void checkPutGet(IgniteCache<Object, Object> cache) {
+    private void checkPutGet(IgniteCache<Object, Object> cache,
+        @Nullable IgniteTransactions txs,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation) {
         log.info("Check cache: " + cache.getName());
 
         final int KEYS = 50;
 
         for (int iter = 0; iter < 100; iter++) {
-            log.info("Iteration: " + iter);
+            if (iter % 10 == 0)
+                log.info("Iteration: " + iter);
 
             for (int i = 0; i < KEYS; i++)
                 cache.remove(i);
@@ -118,12 +143,20 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
             for (int i = 0; i < KEYS; i++)
                 putBatch.put(i, iter);
 
-            cache.putAll(putBatch);
+            if (txs != null) {
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    cache.putAll(putBatch);
+
+                    tx.commit();
+                }
+            }
+            else
+                cache.putAll(putBatch);
 
             Map<Object, Object> vals = cache.getAll(putBatch.keySet());
 
             for (int i = 0; i < KEYS; i++)
-                assertNotNull(vals.get(i));
+                assertEquals(iter, vals.get(i));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index af018cc..2ccd950 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -123,7 +123,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
 
                     assertNull(cache.get(key));
 
-                    List<Object> msgs = spi.recordedMessages();
+                    List<Object> msgs = spi.recordedMessages(false);
 
                     assertEquals(1, msgs.size());
                 }
@@ -216,7 +216,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
 
                     assertNull(cache.get(key));
 
-                    List<Object> msgs = newNodeSpi.recordedMessages();
+                    List<Object> msgs = newNodeSpi.recordedMessages(false);
 
                     assertEquals(1, msgs.size());
 
@@ -234,7 +234,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
                         TestRecordingCommunicationSpi spi =
                             (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-                        spi.stopBlock();
+                        spi.stopBlock(true);
                     }
 
                     awaitPartitionMapExchange();
@@ -304,7 +304,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
             TestRecordingCommunicationSpi spi =
                 (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-            List<Object> msgs = spi.recordedMessages();
+            List<Object> msgs = spi.recordedMessages(false);
 
             assertEquals(0, msgs.size());
         }
@@ -330,14 +330,14 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
         if (nearKey != null) {
             assertNull(cache.get(nearKey));
 
-            msgs = spi.recordedMessages();
+            msgs = spi.recordedMessages(false);
 
             assertEquals(1, msgs.size());
         }
 
         assertNull(cache.get(backupKey));
 
-        msgs = spi.recordedMessages();
+        msgs = spi.recordedMessages(false);
 
         assertTrue(msgs.isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
index 48fc961..08f44cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
@@ -248,12 +248,12 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
      * @param primarySpi Primary node SPI.
      */
     private void checkMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) {
-        List<Object> msgs = spi.recordedMessages();
+        List<Object> msgs = spi.recordedMessages(false);
 
         assertEquals(1, msgs.size());
         assertTrue(msgs.get(0) instanceof GridNearSingleGetRequest);
 
-        msgs = primarySpi.recordedMessages();
+        msgs = primarySpi.recordedMessages(false);
 
         assertEquals(1, msgs.size());
         assertTrue(msgs.get(0) instanceof GridNearSingleGetResponse);
@@ -264,10 +264,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
      * @param primarySpi Primary node SPI.
      */
     private void checkNoMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) {
-        List<Object> msgs = spi.recordedMessages();
+        List<Object> msgs = spi.recordedMessages(false);
         assertEquals(0, msgs.size());
 
-        msgs = primarySpi.recordedMessages();
+        msgs = primarySpi.recordedMessages(false);
         assertEquals(0, msgs.size());
     }
 


Mime
View raw message