ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-2407 Fixed 'primary_sync' mode for transactional cache
Date Tue, 05 Apr 2016 12:26:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/master e7e223f7c -> f1af2c7b0


http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
new file mode 100644
index 0000000..3bc22ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
@@ -0,0 +1,1114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 2;
+
+    /** */
+    private static final int NODES = SRVS + CLIENTS;
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(clientMode);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(SRVS);
+
+        try {
+            for (int i = 0; i < CLIENTS; i++) {
+                clientMode = true;
+
+                Ignite client = startGrid(SRVS + i);
+
+                assertTrue(client.configuration().isClientMode());
+            }
+        }
+        finally {
+            clientMode = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyCommitFromPrimary() throws Exception {
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true));
+
+        singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommitFromPrimary(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            for (int i = 0; i < SRVS; i++) {
+                Ignite node = ignite(i);
+
+                singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param ignite Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommitFromPrimary(
+        Ignite ignite,
+        final CacheConfiguration<Object, Object> ccfg,
+        IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        commSpi0.record(GridDhtTxFinishRequest.class);
+
+        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
+            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
+                return e.message() instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        c.apply(key, cache);
+
+        assertEquals(key, cache.localPeek(key));
+
+        U.sleep(50);
+
+        for (int i = 0; i < SRVS; i++) {
+            Ignite node = ignite(i);
+
+            if (node != ignite)
+                assertNull(node.cache(null).localPeek(key));
+        }
+
+        commSpi0.stopBlock(true);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        List<Object> msgs = commSpi0.recordedMessages(true);
+
+        assertEquals(ccfg.getBackups(), msgs.size());
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, cache);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyPrimaryNodeFail1() throws Exception {
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyPrimaryNodeFail2() throws Exception {
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, true, false));
+
+        singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyPrimaryNodeLeft(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyPrimaryNodeLeft(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        final IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = startGrid(NODES);
+
+        final TestRecordingCommunicationSpi commSpiClient =
+            (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        commSpiClient.blockMessages(GridNearTxFinishRequest.class, ignite.name());
+
+        final IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                c.apply(key, clientCache);
+
+                return null;
+            }
+        });
+
+        boolean waitMsgSnd = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return commSpiClient.hasBlockedMessages();
+            }
+        }, 5000);
+
+        assertTrue(waitMsgSnd);
+
+        ignite.close();
+
+        commSpiClient.stopBlock(false);
+
+        fut.get();
+
+        awaitPartitionMapExchange();
+
+        waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        clientCache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, clientCache);
+
+        waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleKeyCommit() throws Exception {
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true));
+
+        singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommit(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            for (int i = 1; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                log.info("Test node: " + node.name());
+
+                singleKeyCommit(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        singleKeyCommit(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void singleKeyCommit(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = ignite(0);
+
+        assertNotSame(ignite, client);
+
+        TestRecordingCommunicationSpi commSpiClient =
+            (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        commSpiClient.record(GridNearTxFinishRequest.class);
+
+        commSpi0.record(GridDhtTxFinishRequest.class);
+
+        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
+            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
+                return e.message() instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        c.apply(key, clientCache);
+
+        assertEquals(key, cache.localPeek(key));
+
+        U.sleep(50);
+
+        boolean nearCache = ((IgniteCacheProxy)clientCache).context().isNear();
+
+        for (int i = 1; i < NODES; i++) {
+            Ignite node = ignite(i);
+
+            if (nearCache
+                && node == client &&
+                !node.affinity(ccfg.getName()).isPrimaryOrBackup(node.cluster().localNode(), key))
+                assertEquals("Invalid value for node: " + i, key, ignite(i).cache(null).localPeek(key));
+            else
+                assertNull("Invalid value for node: " + i, ignite(i).cache(null).localPeek(key));
+        }
+
+        commSpi0.stopBlock(true);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+
+        List<Object> msgs = commSpiClient.recordedMessages(true);
+
+        assertEquals(1, msgs.size());
+
+        GridNearTxFinishRequest req = (GridNearTxFinishRequest)msgs.get(0);
+
+        assertEquals(PRIMARY_SYNC, req.syncMode());
+
+        msgs = commSpi0.recordedMessages(true);
+
+        assertEquals(ccfg.getBackups(), msgs.size());
+
+        clientCache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        c.apply(key, clientCache);
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWaitPrimaryResponse() throws Exception {
+        checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false));
+
+        checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false));
+
+        checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true));
+
+        checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkWaitPrimaryResponse(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            for (int i = 1; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                log.info("Test node: " + node.name());
+
+                checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        Map<Integer, Integer> map = new HashMap<>();
+
+                        for (int i = 0; i < 50; i++)
+                            map.put(i, i);
+
+                        map.put(key, key);
+
+                        cache.putAll(map);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+
+                        checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Map<Integer, Integer> map = new HashMap<>();
+
+                                for (int i = 0; i < 50; i++)
+                                    map.put(i, i);
+
+                                map.put(key, key);
+
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.putAll(map);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void checkWaitPrimaryResponse(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        final IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = ignite(0);
+
+        assertNotSame(ignite, client);
+
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        final IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        commSpi0.blockMessages(GridNearTxFinishResponse.class, client.name());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                c.apply(key, clientCache);
+
+                return null;
+            }
+        }, "tx-thread");
+
+        U.sleep(100);
+
+        assertFalse(fut.isDone());
+
+        commSpi0.stopBlock(true);
+
+        fut.get();
+
+        waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOnePhaseMessages() throws Exception {
+        checkOnePhaseMessages(cacheConfiguration(null, PRIMARY_SYNC, 1, false, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkOnePhaseMessages(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try {
+            for (int i = 1; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                log.info("Test node: " + node.name());
+
+                checkOnePhaseMessages(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                    @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                        cache.put(key, key);
+                    }
+                });
+
+                for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                    for (final TransactionIsolation isolation : TransactionIsolation.values()) {
+                        checkOnePhaseMessages(node, ccfg, new IgniteBiInClosure<Integer, IgniteCache<Object, Object>>() {
+                            @Override public void apply(Integer key, IgniteCache<Object, Object> cache) {
+                                Ignite ignite = cache.unwrap(Ignite.class);
+
+                                try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+                                    cache.put(key, key);
+
+                                    tx.commit();
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param client Node executing cache operation.
+     * @param ccfg Cache configuration.
+     * @param c Cache update closure.
+     * @throws Exception If failed.
+     */
+    private void checkOnePhaseMessages(
+        Ignite client,
+        final CacheConfiguration<Object, Object> ccfg,
+        final IgniteBiInClosure<Integer, IgniteCache<Object, Object>> c) throws Exception {
+        Ignite ignite = ignite(0);
+
+        assertNotSame(ignite, client);
+
+        TestRecordingCommunicationSpi commSpiClient =
+            (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+        final Integer key = primaryKey(cache);
+
+        cache.remove(key);
+
+        waitKeyRemoved(ccfg.getName(), key);
+
+        final IgniteCache<Object, Object> clientCache = client.cache(ccfg.getName());
+
+        commSpi0.record(GridNearTxFinishResponse.class, GridNearTxPrepareResponse.class);
+        commSpiClient.record(GridNearTxPrepareRequest.class, GridNearTxFinishRequest.class);
+
+        c.apply(key, clientCache);
+
+        List<Object> srvMsgs = commSpi0.recordedMessages(true);
+
+        assertEquals("Unexpected messages: " + srvMsgs, 1, srvMsgs.size());
+        assertTrue("Unexpected message: " + srvMsgs.get(0), srvMsgs.get(0) instanceof GridNearTxPrepareResponse);
+
+        List<Object> clientMsgs = commSpiClient.recordedMessages(true);
+
+        assertEquals("Unexpected messages: " + clientMsgs, 1, clientMsgs.size());
+        assertTrue("Unexpected message: " + clientMsgs.get(0), clientMsgs.get(0) instanceof GridNearTxPrepareRequest);
+
+        GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)clientMsgs.get(0);
+
+        assertTrue(req.onePhaseCommit());
+
+        for (Ignite ignite0 : G.allGrids())
+            assertEquals(key, ignite0.cache(cache.getName()).get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxSyncMode() throws Exception {
+        Ignite ignite = ignite(0);
+
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
+
+        try {
+            caches.add(createCache(ignite, cacheConfiguration("fullSync1", FULL_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullSync2", FULL_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullAsync1", FULL_ASYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("fullAsync2", FULL_ASYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("primarySync1", PRIMARY_SYNC, 1, false, false), true));
+            caches.add(createCache(ignite, cacheConfiguration("primarySync2", PRIMARY_SYNC, 1, false, false), true));
+
+            for (int i = 0; i < NODES; i++) {
+                checkTxSyncMode(ignite(i), true);
+                checkTxSyncMode(ignite(i), false);
+            }
+        }
+        finally {
+            for (IgniteCache<Object, Object> cache : caches)
+                ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Cache key.
+     * @throws Exception If failed.
+     */
+    private void waitKeyRemoved(final String cacheName, final Object key) throws Exception {
+        boolean waitRmv = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Ignite ignite : G.allGrids()) {
+                    if (ignite.cache(cacheName).get(key) != null)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        assertTrue(waitRmv);
+    }
+
+    /**
+     * @param ignite Node.
+     * @param expNodes Expected number of cache server nodes.
+     * @param cacheName Cache name.
+     * @param key Cache key.
+     * @throws Exception If failed.
+     */
+    private void waitKeyUpdated(Ignite ignite, int expNodes, final String cacheName, final Object key) throws Exception {
+        Affinity<Object> aff = ignite.affinity(cacheName);
+
+        final Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        assertEquals(expNodes, nodes.size());
+
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (ClusterNode node : nodes) {
+                    Ignite ignite = grid(node);
+
+                    if (!key.equals(ignite.cache(cacheName).get(key)))
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        assertTrue(wait);
+
+        for (Ignite ignite0 : G.allGrids())
+            assertEquals(key, ignite0.cache(cacheName).get(key));
+    }
+
+    /**
+     * @param ignite Node.
+     * @param ccfg Cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @return Created cache.
+     */
+    private <K, V> IgniteCache<K, V> createCache(Ignite ignite, CacheConfiguration<K, V> ccfg,
+        boolean nearCache) {
+        IgniteCache<K, V> cache = ignite.createCache(ccfg);
+
+        if (nearCache)
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+        return cache;
+    }
+
+    /**
+     * @param ignite Node.
+     * @param commit If {@code true} commits transaction.
+     */
+    private void checkTxSyncMode(Ignite ignite, boolean commit) {
+        IgniteTransactions txs = ignite.transactions();
+
+        IgniteCache<Object, Object> fullSync1 = ignite.cache("fullSync1");
+        IgniteCache<Object, Object> fullSync2 = ignite.cache("fullSync2");
+        IgniteCache<Object, Object> fullAsync1 = ignite.cache("fullAsync1");
+        IgniteCache<Object, Object> fullAsync2 = ignite.cache("fullAsync2");
+        IgniteCache<Object, Object> primarySync1 = ignite.cache("primarySync1");
+        IgniteCache<Object, Object> primarySync2 = ignite.cache("primarySync2");
+
+        for (int i = 0; i < 3; i++) {
+            int key = 0;
+
+            for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        for (int j = 0; j < 100; j++)
+                            primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullSync2.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        fullAsync2.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_ASYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        primarySync2.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+                        fullAsync2.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        primarySync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+
+                        checkSyncMode(tx, PRIMARY_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullSync1.put(key++, 1);
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                        fullAsync1.put(key++, 1);
+                        primarySync1.put(key++, 1);
+                        fullSync1.put(key++, 1);
+
+                        checkSyncMode(tx, FULL_SYNC);
+
+                        if (commit)
+                            tx.commit();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param syncMode Expected write synchronization mode.
+     */
+    private void checkSyncMode(Transaction tx, CacheWriteSynchronizationMode syncMode) {
+        assertEquals(syncMode, ((TransactionProxyImpl)tx).tx().syncMode());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} configures cache store.
+     * @param nearCache If {@code true} configures near cache.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(String name,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store,
+        boolean nearCache) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+        ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
new file mode 100644
index 0000000..08396da
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 2;
+
+    /** */
+    private static final int NODES = SRVS + CLIENTS;
+
+    /** */
+    private boolean clientMode;
+
+    /** */
+    private static final int MULTITHREADED_TEST_KEYS = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(clientMode);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(SRVS);
+
+        clientMode = true;
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite client = startGrid(SRVS + i);
+
+            assertTrue(client.configuration().isClientMode());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedPrimarySyncRestart() throws Exception {
+        multithreadedTests(PRIMARY_SYNC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedPrimarySync() throws Exception {
+        multithreadedTests(PRIMARY_SYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullSync() throws Exception {
+        multithreadedTests(FULL_SYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullSyncRestart() throws Exception {
+        multithreadedTests(FULL_SYNC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullAsync() throws Exception {
+        multithreadedTests(FULL_ASYNC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedFullAsyncRestart() throws Exception {
+        multithreadedTests(FULL_ASYNC, true);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     * @param restart Restart flag.
+     * @throws Exception If failed.
+     */
+    private void multithreadedTests(CacheWriteSynchronizationMode syncMode, boolean restart) throws Exception {
+        multithreaded(syncMode, 0, false, false, restart);
+
+        multithreaded(syncMode, 1, false, false, restart);
+
+        multithreaded(syncMode, 1, true, false, restart);
+
+        multithreaded(syncMode, 2, false, false, restart);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} sets store in cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @param restart If {@code true} restarts one node during test.
+     * @throws Exception If failed.
+     */
+    private void multithreaded(CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store,
+        boolean nearCache,
+        boolean restart) throws Exception {
+        final Ignite ignite = ignite(0);
+
+        createCache(ignite, cacheConfiguration(null, syncMode, backups, store), nearCache);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            if (restart) {
+                restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        while (!stop.get()) {
+                            startGrid(NODES);
+
+                            U.sleep(100);
+
+                            stopGrid(NODES);
+                        }
+                        return null;
+                    }
+                }, "restart-thread");
+            }
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                    cache.put(key, rnd.nextInt());
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    for (int i = 0; i < 100; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    cache.putAll(map);
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    for (int i = 0; i < 100; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    try {
+                        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            for (Map.Entry<Integer, Integer> e : map.entrySet())
+                                cache.put(e.getKey(), e.getValue());
+
+                            tx.commit();
+                        }
+                    }
+                    catch (CacheException | IgniteException e) {
+                        // Ignore.
+                    }
+                }
+            });
+
+            commitMultithreaded(new IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>>() {
+                @Override public void apply(Ignite ignite, IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new LinkedHashMap<>();
+
+                    for (int i = 0; i < 10; i++) {
+                        Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS);
+
+                        map.put(key, rnd.nextInt());
+                    }
+
+                    while (true) {
+                        try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            for (Map.Entry<Integer, Integer> e : map.entrySet())
+                                cache.put(e.getKey(), e.getValue());
+
+                            tx.commit();
+
+                            break;
+                        }
+                        catch (TransactionOptimisticException e) {
+                           // Retry.
+                        }
+                        catch (CacheException | IgniteException e) {
+                            break;
+                        }
+                    }
+                }
+            });
+        }
+        finally {
+            stop.set(true);
+
+            ignite.destroyCache(null);
+
+            if (restartFut != null)
+                restartFut.get();
+        }
+    }
+
+    /**
+     * @param c Test iteration closure.
+     * @throws Exception If failed.
+     */
+    public void commitMultithreaded(final IgniteBiInClosure<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
+        final long stopTime = System.currentTimeMillis() + 10_000;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                int nodeIdx = idx % NODES;
+
+                Thread.currentThread().setName("tx-thread-" + nodeIdx);
+
+                Ignite ignite = ignite(nodeIdx);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                while (System.currentTimeMillis() < stopTime)
+                    c.apply(ignite, cache);
+            }
+        }, NODES * 3, "tx-thread");
+
+        final IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+        for (int key = 0; key < MULTITHREADED_TEST_KEYS; key++) {
+            final Integer key0 = key;
+
+            boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    final Integer val = cache.get(key0);
+
+                    for (int i = 1; i < NODES; i++) {
+                        IgniteCache<Integer, Integer> cache = ignite(i).cache(null);
+
+                        if (!val.equals(cache.get(key0)))
+                            return false;
+                    }
+                    return true;
+                }
+            }, 5000);
+
+            assertTrue(wait);
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param ccfg Cache configuration.
+     * @param nearCache If {@code true} creates near cache on one of client nodes.
+     * @return Created cache.
+     */
+    private <K, V> IgniteCache<K, V> createCache(Ignite ignite, CacheConfiguration<K, V> ccfg,
+        boolean nearCache) {
+        IgniteCache<K, V> cache = ignite.createCache(ccfg);
+
+        if (nearCache)
+            ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+        return cache;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param store If {@code true} configures cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(String name,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean store) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+        ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
index 0666349..277ffaf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
@@ -110,9 +110,9 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest
         TestRecordingCommunicationSpi spi1 = (TestRecordingCommunicationSpi)g1.configuration().getCommunicationSpi();
         TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)g2.configuration().getCommunicationSpi();
 
-        info(spi0.recordedMessages().size() + " " +
-            spi1.recordedMessages().size() + " " +
-            spi2.recordedMessages().size());
+        info(spi0.recordedMessages(false).size() + " " +
+            spi1.recordedMessages(false).size() + " " +
+            spi2.recordedMessages(false).size());
 
         checkCache(c0, cnt);
         checkCache(c1, cnt);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index a08d080..ad122e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -25,8 +25,6 @@ import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -51,6 +49,9 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -94,9 +95,10 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     protected CacheConfiguration cacheConfiguration(String gridName) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setCacheMode(CacheMode.PARTITIONED);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
         ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         return ccfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 7aab990..facdc4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -92,8 +92,11 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -311,6 +314,9 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(IgniteCacheGetCustomCollectionsSelfTest.class);
         suite.addTestSuite(IgniteCacheLoadRebalanceEvictionSelfTest.class);
+        suite.addTestSuite(IgniteCachePrimarySyncTest.class);
+        suite.addTestSuite(IgniteTxCachePrimarySyncTest.class);
+        suite.addTestSuite(IgniteTxCacheWriteSynchronizationModesMultithreadedTest.class);
 
         return suite;
     }


Mime
View raw message