ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-2407
Date Wed, 03 Feb 2016 16:08:01 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2407 21e181395 -> 7b69c3f1b


ignite-2407


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b69c3f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b69c3f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b69c3f1

Branch: refs/heads/ignite-2407
Commit: 7b69c3f1b52b44a3b4116c9d27c50ec6a4f73755
Parents: 21e1813
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Feb 3 15:02:41 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Feb 3 19:03:23 2016 +0300

----------------------------------------------------------------------
 .../internal/TestRecordingCommunicationSpi.java |  25 +-
 .../IgniteCachePrimarySyncTxsTest.java          | 128 ---
 .../IgniteCacheReadFromBackupTest.java          |   2 +-
 .../IgniteTxCachePrimarySyncTest.java           | 860 +++++++++++++++++++
 ...teSynchronizationModesMultithreadedTest.java | 418 +++++++++
 .../dht/GridCacheTxNodeFailureSelfTest.java     |  10 +-
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 7 files changed, 1310 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 8a602ad..9eca8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -98,6 +98,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     public void record(@Nullable Class<?> recordCls) {
         synchronized (this) {
             this.recordCls = recordCls;
+
+            recordedMsgs = new ArrayList<>();
         }
     }
 
@@ -115,6 +117,15 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
+     * @return {@code True} if there are blocked messages.
+     */
+    public boolean hasBlockedMessages() {
+        synchronized (this) {
+            return !blockedMsgs.isEmpty();
+        }
+    }
+
+    /**
      * @param blockP Message block predicate.
      */
     public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
@@ -142,14 +153,20 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
-     * Stops block messages and sends all already blocked messages.
+     * Stops block messages and can sends all already blocked messages.
+     *
+     * @param sndMsgs If {@code true} sends blocked messages.
      */
-    public void stopBlock() {
+    public void stopBlock(boolean sndMsgs) {
         synchronized (this) {
+            blockP = null;
+
             blockCls.clear();
 
-            for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs)
-                super.sendMessage(msg.get1(), msg.get2());
+            if (sndMsgs) {
+                for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs)
+                    super.sendMessage(msg.get1(), msg.get2());
+            }
 
             blockedMsgs.clear();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTxsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTxsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTxsTest.java
deleted file mode 100644
index 61bf8a5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTxsTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
-
-/**
- *
- */
-public class IgniteCachePrimarySyncTxsTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int SRVS = 4;
-
-    /** */
-    private static final int NODES = SRVS + 1;
-
-    /** */
-    private boolean clientMode;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
-        cfg.setClientMode(clientMode);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGrids(SRVS);
-
-        clientMode = true;
-
-        Ignite client = startGrid(SRVS);
-
-        assertTrue(client.configuration().isClientMode());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxSyncMode() throws Exception {
-        Ignite ignite = ignite(0);
-
-        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
-
-        try {
-            caches.add(ignite.createCache(cacheConfiguration("fullSync1", FULL_SYNC, 1)));
-            caches.add(ignite.createCache(cacheConfiguration("fullSync2", FULL_SYNC, 1)));
-            caches.add(ignite.createCache(cacheConfiguration("fullAsync1", FULL_ASYNC, 1)));
-            caches.add(ignite.createCache(cacheConfiguration("fullAsync2", FULL_ASYNC, 1)));
-            caches.add(ignite.createCache(cacheConfiguration("primarySync1", PRIMARY_SYNC, 1)));
-            caches.add(ignite.createCache(cacheConfiguration("primarySync2", PRIMARY_SYNC, 1)));
-
-            for (int i = 0; i < NODES; i++)
-                checkTxSyncMode(ignite(i));
-        }
-        finally {
-            for (IgniteCache<Object, Object> cache : caches)
-                ignite.destroyCache(cache.getName());
-        }
-    }
-
-    private void checkTxSyncMode(Ignite ignite) {
-
-    }
-
-    /**
-     * @param name Cache name.
-     * @param syncMode Write synchronization mode.
-     * @param backups Number of backups.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<Object, Object> cacheConfiguration(String name,
-        CacheWriteSynchronizationMode syncMode,
-        int backups) {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
-        ccfg.setName(name);
-        ccfg.setAtomicityMode(TRANSACTIONAL);
-        ccfg.setWriteSynchronizationMode(syncMode);
-        ccfg.setBackups(backups);
-
-        return ccfg;
-    }
-
-    public void testPrimarySyncMessages() throws Exception {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index af018cc..191ae24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -234,7 +234,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
                         TestRecordingCommunicationSpi spi =
                             (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-                        spi.stopBlock();
+                        spi.stopBlock(true);
                     }
 
                     awaitPartitionMapExchange();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
new file mode 100644
index 0000000..3e06340
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
@@ -0,0 +1,860 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 2;
+
+    /** */
+    private static final int NODES = SRVS + CLIENTS;
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(clientMode);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(SRVS);
+
+        try {
+            for (int i = 0; i < CLIENTS; i++) {
+                clientMode = true;
+
+                Ignite client = startGrid(SRVS + i);
+
+                assertTrue(client.configuration().isClientMode());
+            }
+        }
+        finally {
+            clientMode = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyCommitFromPrimary() throws Exception {
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommitFromPrimary(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            for (int i = 0; i < SRVS; i++) {
+                Ignite node = ignite(i);
+
+                singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param ignite Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommitFromPrimary(
+        Ignite ignite,
+        final CacheConfiguration<Object, Object> ccfg,
+        IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        commSpi0.record(GridDhtTxFinishRequest.class);
+
+        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
+            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
+                return e.message() instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        c.apply(key, cache);
+
+        assertEquals(key, cache.localPeek(key));
+
+        U.sleep(50);
+
+        for (int i = 0; i < SRVS; i++) {
+            Ignite node = ignite(i);
+
+            if (node != ignite)
+                assertNull(node.cache(null).localPeek(key));
+        }
+
+        commSpi0.stopBlock(true);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        List<Object> msgs = commSpi0.recordedMessages();
+
+        assertEquals(ccfg.getBackups(), msgs.size());
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, cache);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyPrimaryNodeFail() throws Exception {
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, true, false));
+
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyPrimaryNodeLeft(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyPrimaryNodeLeft(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        final IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = startGrid(NODES);
+
+        final TestRecordingCommunicationSpi commSpiClient =
+            (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        commSpiClient.blockMessages(GridNearTxFinishRequest.class, ignite.name());
+
+        final IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                c.apply(key, clientCache);
+
+                return null;
+            }
+        });
+
+        boolean waitMsgSnd = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return commSpiClient.hasBlockedMessages();
+            }
+        }, 5000);
+
+        assertTrue(waitMsgSnd);
+
+        ignite.close();
+
+        commSpiClient.stopBlock(false);
+
+        fut.get();
+
+        awaitPartitionMapExchange();
+
+        waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        clientCache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, clientCache);
+
+        waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyCommit() throws Exception {
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommit(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            for (int i = 1; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                log.info("Test node: " + node.name());
+
+                singleKeyCommit(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyCommit(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommit(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = ignite(0);
+
+        assertNotSame(ignite, client);
+
+        TestRecordingCommunicationSpi commSpiClient =
+            (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        commSpiClient.record(GridNearTxFinishRequest.class);
+
+        commSpi0.record(GridDhtTxFinishRequest.class);
+
+        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
+            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
+                return e.message() instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        c.apply(key, clientCache);
+
+        assertEquals(key, cache.localPeek(key));
+
+        U.sleep(50);
+
+        boolean nearCache = ((IgniteCacheProxy)clientCache).context().isNear();
+
+        for (int i = 1; i < NODES; i++) {
+            Ignite node = ignite(i);
+
+            if (nearCache
+                && node == client &&
+                !node.affinity(ccfg.getName()).isPrimaryOrBackup(node.cluster().localNode(), key))
+                assertEquals("Invalid value for node: " + i, key, ignite(i).cache(null).localPeek(key));
+            else
+                assertNull("Invalid value for node: " + i, ignite(i).cache(null).localPeek(key));
+        }
+
+        commSpi0.stopBlock(true);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        List<Object> msgs = commSpiClient.recordedMessages();
+
+        assertEquals(1, msgs.size());
+
+        GridNearTxFinishRequest req = (GridNearTxFinishRequest)msgs.get(0);
+
+        assertEquals(PRIMARY_SYNC, req.syncMode());
+
+        msgs = commSpi0.recordedMessages();
+
+        assertEquals(ccfg.getBackups(), msgs.size());
+
+        clientCache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, clientCache);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxSyncMode() throws Exception {
+        Ignite ignite = ignite(0);
+
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
+
+        try {
+            caches.add(createCache(ignite, cacheConfiguration("fullSync1", FULL_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullSync2", FULL_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullAsync1", FULL_ASYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullAsync2", FULL_ASYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("primarySync1", PRIMARY_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("primarySync2", PRIMARY_SYNC, 1, false, false), true));
+
+            for (int i = 0; i < NODES; i++) {
+                checkTxSyncMode(ignite(i), true);
+                checkTxSyncMode(ignite(i), false);
+            }
+        }
+        finally {
+            for (IgniteCache<Object, Object> cache : caches)
+                ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Cache key.
+     * @throws Exception If failed.
+     */
+    private void waitKeyRemoved(final String cacheName, final Object key) throws Exception {
+        boolean waitRmv = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Ignite ignite : G.allGrids()) {
+                    if (ignite.cache(cacheName).get(key) != null)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        assertTrue(waitRmv);
+    }
+
+    /**
+     * @param ignite Node.
+     * @param expNodes Expected number of cache server nodes.
+     * @param cacheName Cache name.
+     * @param key Cache key.
+     * @throws Exception If failed.
+     */
+    private void waitKeyUpdated(Ignite ignite, int expNodes, final String cacheName, final Object key) throws Exception {
+        Affinity<Object> aff = ignite.affinity(cacheName);
+
+        final Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        assertEquals(expNodes, nodes.size());
+
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (ClusterNode node : nodes) {
+                    Ignite ignite = grid(node);
+
+                    if (!key.equals(ignite.cache(cacheName).get(key)))
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        assertTrue(wait);
+
+        for (Ignite ignite0 : G.allGrids())
+            assertEquals(key, ignite0.cache(cacheName).get(key));
+    }
+
+    /**
+     * @param ignite Node.
+     * @param ccfg Cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @return Created cache.
+     */
+    private <K, V> IgniteCache<K, V> createCache(Ignite ignite, CacheConfiguration<K, V> ccfg,
+        boolean nearCache) {
+        IgniteCache<K, V> cache = ignite.createCache(ccfg);
+
+        if (nearCache)
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+        return cache;
+    }
+
+    /**
+     * @param ignite Node.
+     * @param commit If {@code true} commits transaction.
+     */
+    private void checkTxSyncMode(Ignite ignite, boolean commit) {
+        IgniteTransactions txs = ignite.transactions();
+
+        IgniteCache<Object, Object> fullSync1 = ignite.cache("fullSync1");
+        IgniteCache<Object, Object> fullSync2 = ignite.cache("fullSync2");
+        IgniteCache<Object, Object> fullAsync1 = ignite.cache("fullAsync1");
+        IgniteCache<Object, Object> fullAsync2 = ignite.cache("fullAsync2");
+        IgniteCache<Object, Object> primarySync1 = ignite.cache("primarySync1");
+        IgniteCache<Object, Object> primarySync2 = ignite.cache("primarySync2");
+
+        for (int i = 0; i < 3; i++) {
+            int key = 0;
+
+            for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullSync2.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        fullAsync2.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        primarySync2.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+                        fullAsync2.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param syncMode Expected write synchronization mode.
+     */
+    private void checkSyncMode(Transaction tx, CacheWriteSynchronizationMode syncMode) {
+        assertEquals(syncMode, ((TransactionProxyImpl)tx).tx().syncMode());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} configures cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(String name,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store,
+        boolean nearCache) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+        ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
new file mode 100644
index 0000000..2d50ff8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 2;
+
+    /** */
+    private static final int NODES = SRVS + CLIENTS;
+
+    /** */
+    private boolean clientMode;
+
+    /** */
+    private static final int MULTITHREADED_TEST_KEYS = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(clientMode);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(SRVS);
+
+        clientMode = true;
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite client = startGrid(SRVS + i);
+
+            assertTrue(client.configuration().isClientMode());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedPrimarySyncRestart() throws Exception {
+        multithreadedTests(PRIMARY_SYNC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedPrimarySync() throws Exception {
+        multithreadedTests(PRIMARY_SYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullSync() throws Exception {
+        multithreadedTests(FULL_SYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullSyncRestart() throws Exception {
+        multithreadedTests(FULL_SYNC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullAsync() throws Exception {
+        multithreadedTests(FULL_ASYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullAsyncRestart() throws Exception {
+        multithreadedTests(FULL_ASYNC, true);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     * @param restart Restart flag.
+     * @throws Exception If failed.
+     */
+    private void multithreadedTests(CacheWriteSynchronizationMode syncMode, boolean restart) throws Exception {
+        multithreaded(syncMode, 0, false, false, restart);
+
+        multithreaded(syncMode, 1, false, false, restart);
+
+        multithreaded(syncMode, 1, true, false, restart);
+
+        multithreaded(syncMode, 2, false, false, restart);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} sets store in cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @param restart If {@code true} restarts one node during test.
+     * @throws Exception If failed.
+     */
+    private void multithreaded(CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store,
+        boolean nearCache,
+        boolean restart) throws Exception {
+        final Ignite ignite = ignite(0);
+
+        createCache(ignite, cacheConfiguration(null, syncMode, backups, store), nearCache);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            if (restart) {
+                restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        while (!stop.get()) {
+                            startGrid(NODES);
+
+                            U.sleep(100);
+
+                            stopGrid(NODES);
+                        }
+                        return null;
+                    }
+                }, "restart-thread");
+            }
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                    cache.put(key, rnd.nextInt());
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    for (int i = 0; i < 100; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    cache.putAll(map);
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    for (int i = 0; i < 100; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    try {
+                        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            for (Map.Entry<Integer, Integer> e : map.entrySet())
+                                cache.put(e.getKey(), e.getValue());
+
+                            tx.commit();
+                        }
+                    }
+                    catch (CacheException | IgniteException e) {
+                        // Ignore.
+                    }
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new LinkedHashMap<>();
+
+                    for (int i = 0; i < 10; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    while (true) {
+                        try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            for (Map.Entry<Integer, Integer> e : map.entrySet())
+                                cache.put(e.getKey(), e.getValue());
+
+                            tx.commit();
+
+                            break;
+                        }
+                        catch (TransactionOptimisticException e) {
+                           // Retry.
+                        }
+                        catch (CacheException | IgniteException e) {
+                            break;
+                        }
+                    }
+                }
+            });
+        }
+        finally {
+            stop.set(true);
+
+            ignite.destroyCache(null);
+
+            if (restartFut != null)
+                restartFut.get();
+        }
+    }
+
+    /**
+     * @param c Test iteration closure.
+     * @throws Exception If failed.
+     */
+    public void commitMultithreaded(final IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
+        final long stopTime = System.currentTimeMillis() + 10_000;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                int nodeIdx = idx % NODES;
+
+                Thread.currentThread().setName("tx-thread-" + nodeIdx);
+
+                Ignite ignite = ignite(nodeIdx);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                while (System.currentTimeMillis() < stopTime)
+                    c.apply(ignite, cache);
+            }
+        }, NODES * 3, "tx-thread");
+
+        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+        for (int key = 0; key < MULTITHREADED_TEST_KEYS; key++) {
+            final Integer key0 = key;
+            final Integer val = cache.get(key0);
+
+            boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    for (int i = 1; i < NODES; i++) {
+                        IgniteCache<Integer, Integer> cache = ignite(i).cache(null);
+
+                        if (!val.equals(cache.get(key0)))
+                            return false;
+                    }
+                    return true;
+                }
+            }, 5000);
+
+            assertTrue(wait);
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param ccfg Cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @return Created cache.
+     */
+    private <K, V> IgniteCache<K, V> createCache(Ignite ignite, CacheConfiguration<K, V> ccfg,
+        boolean nearCache) {
+        IgniteCache<K, V> cache = ignite.createCache(ccfg);
+
+        if (nearCache)
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+        return cache;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} configures cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(String name,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+        ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index a08d080..ad122e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -25,8 +25,6 @@ import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -51,6 +49,9 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -94,9 +95,10 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     protected CacheConfiguration cacheConfiguration(String gridName) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setCacheMode(CacheMode.PARTITIONED);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
         ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         return ccfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b69c3f1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index be33e1c..37dc0ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -93,6 +93,8 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreate
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -308,6 +310,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheGetCustomCollectionsSelfTest.class);
         suite.addTestSuite(IgniteCacheLoadRebalanceEvictionSelfTest.class);
         suite.addTestSuite(IgniteCachePrimarySyncTest.class);
+        suite.addTestSuite(IgniteTxCachePrimarySyncTest.class);
+        suite.addTestSuite(IgniteTxCacheWriteSynchronizationModesMultithreadedTest.class);
 
         return suite;
     }


Mime
View raw message