Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57F4A190F6 for ; Mon, 11 Apr 2016 14:26:17 +0000 (UTC) Received: (qmail 47150 invoked by uid 500); 11 Apr 2016 14:26:14 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 47058 invoked by uid 500); 11 Apr 2016 14:26:11 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 45132 invoked by uid 99); 11 Apr 2016 14:25:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 14:25:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13CFAEAB52; Mon, 11 Apr 2016 14:25:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 11 Apr 2016 14:25:48 -0000 Message-Id: <082398dd2aca4c5f97c61ed9025b703a@git.apache.org> In-Reply-To: <15dbdc0d71374accaf29cbb3b7b59af4@git.apache.org> References: <15dbdc0d71374accaf29cbb3b7b59af4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/50] [abbrv] ignite git commit: ignite-2407 Fixed 'primary_sync' mode for transactional cache http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java new file mode 100644 index 0000000..3bc22ef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java @@ -0,0 +1,1114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.IgnitePredicateX; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 2; + + /** */ + private static final int NODES = SRVS + CLIENTS; + + /** */ + private boolean clientMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(clientMode); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(SRVS); + + try { + for (int i = 0; i < CLIENTS; i++) { + clientMode = true; + + Ignite client = startGrid(SRVS + i); + + assertTrue(client.configuration().isClientMode()); + } + } + finally { + clientMode = false; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleKeyCommitFromPrimary() throws Exception { + singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false)); + + singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false)); + + singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true)); + + singleKeyCommitFromPrimary(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void singleKeyCommitFromPrimary(CacheConfiguration ccfg) throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache = ignite.createCache(ccfg); + + try { + for (int i = 0; i < SRVS; i++) { + Ignite node = ignite(i); + + singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + cache.put(key, key); + } + }); + + for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + singleKeyCommitFromPrimary(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.put(key, key); + + tx.commit(); + } + } + }); + } + } + } + } + finally { + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param ignite Node executing cache operation. + * @param ccfg Cache configuration. + * @param c Cache update closure. + * @throws Exception If failed. + */ + private void singleKeyCommitFromPrimary( + Ignite ignite, + final CacheConfiguration ccfg, + IgniteBiInClosure> c) throws Exception { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + IgniteCache cache = ignite.cache(ccfg.getName()); + + final Integer key = primaryKey(cache); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + commSpi0.record(GridDhtTxFinishRequest.class); + + commSpi0.blockMessages(new IgnitePredicateX() { + @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException { + return e.message() instanceof GridDhtTxFinishRequest; + } + }); + + c.apply(key, cache); + + assertEquals(key, cache.localPeek(key)); + + U.sleep(50); + + for (int i = 0; i < SRVS; i++) { + Ignite node = ignite(i); + + if (node != ignite) + assertNull(node.cache(null).localPeek(key)); + } + + commSpi0.stopBlock(true); + + waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key); + + List msgs = commSpi0.recordedMessages(true); + + assertEquals(ccfg.getBackups(), msgs.size()); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + c.apply(key, cache); + + waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key); + } + + /** + * @throws Exception If failed. + */ + public void testSingleKeyPrimaryNodeFail1() throws Exception { + singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false)); + + singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false)); + } + + /** + * @throws Exception If failed. + */ + public void testSingleKeyPrimaryNodeFail2() throws Exception { + singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 2, true, false)); + + singleKeyPrimaryNodeLeft(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void singleKeyPrimaryNodeLeft(CacheConfiguration ccfg) throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache = ignite.createCache(ccfg); + + try { + ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + for (int i = 0; i < NODES; i++) { + Ignite node = ignite(i); + + singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + cache.put(key, key); + } + }); + + for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + singleKeyPrimaryNodeLeft(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.put(key, key); + + tx.commit(); + } + } + }); + } + } + } + } + finally { + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param client Node executing cache operation. + * @param ccfg Cache configuration. + * @param c Cache update closure. + * @throws Exception If failed. + */ + private void singleKeyPrimaryNodeLeft( + Ignite client, + final CacheConfiguration ccfg, + final IgniteBiInClosure> c) throws Exception { + Ignite ignite = startGrid(NODES); + + final TestRecordingCommunicationSpi commSpiClient = + (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi(); + + IgniteCache cache = ignite.cache(ccfg.getName()); + + final Integer key = primaryKey(cache); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + commSpiClient.blockMessages(GridNearTxFinishRequest.class, ignite.name()); + + final IgniteCache clientCache = client.cache(ccfg.getName()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + c.apply(key, clientCache); + + return null; + } + }); + + boolean waitMsgSnd = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return commSpiClient.hasBlockedMessages(); + } + }, 5000); + + assertTrue(waitMsgSnd); + + ignite.close(); + + commSpiClient.stopBlock(false); + + fut.get(); + + awaitPartitionMapExchange(); + + waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key); + + clientCache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + c.apply(key, clientCache); + + waitKeyUpdated(client, ccfg.getBackups() + 1, ccfg.getName(), key); + } + + /** + * @throws Exception If failed. + */ + public void testSingleKeyCommit() throws Exception { + singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false)); + + singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false)); + + singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true)); + + singleKeyCommit(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void singleKeyCommit(CacheConfiguration ccfg) throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache = ignite.createCache(ccfg); + + try { + ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + for (int i = 1; i < NODES; i++) { + Ignite node = ignite(i); + + log.info("Test node: " + node.name()); + + singleKeyCommit(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + cache.put(key, key); + } + }); + + for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + singleKeyCommit(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.put(key, key); + + tx.commit(); + } + } + }); + } + } + } + } + finally { + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param client Node executing cache operation. + * @param ccfg Cache configuration. + * @param c Cache update closure. + * @throws Exception If failed. + */ + private void singleKeyCommit( + Ignite client, + final CacheConfiguration ccfg, + IgniteBiInClosure> c) throws Exception { + Ignite ignite = ignite(0); + + assertNotSame(ignite, client); + + TestRecordingCommunicationSpi commSpiClient = + (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi(); + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + IgniteCache cache = ignite.cache(ccfg.getName()); + + final Integer key = primaryKey(cache); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + IgniteCache clientCache = client.cache(ccfg.getName()); + + commSpiClient.record(GridNearTxFinishRequest.class); + + commSpi0.record(GridDhtTxFinishRequest.class); + + commSpi0.blockMessages(new IgnitePredicateX() { + @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException { + return e.message() instanceof GridDhtTxFinishRequest; + } + }); + + c.apply(key, clientCache); + + assertEquals(key, cache.localPeek(key)); + + U.sleep(50); + + boolean nearCache = ((IgniteCacheProxy)clientCache).context().isNear(); + + for (int i = 1; i < NODES; i++) { + Ignite node = ignite(i); + + if (nearCache + && node == client && + !node.affinity(ccfg.getName()).isPrimaryOrBackup(node.cluster().localNode(), key)) + assertEquals("Invalid value for node: " + i, key, ignite(i).cache(null).localPeek(key)); + else + assertNull("Invalid value for node: " + i, ignite(i).cache(null).localPeek(key)); + } + + commSpi0.stopBlock(true); + + waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key); + + List msgs = commSpiClient.recordedMessages(true); + + assertEquals(1, msgs.size()); + + GridNearTxFinishRequest req = (GridNearTxFinishRequest)msgs.get(0); + + assertEquals(PRIMARY_SYNC, req.syncMode()); + + msgs = commSpi0.recordedMessages(true); + + assertEquals(ccfg.getBackups(), msgs.size()); + + clientCache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + c.apply(key, clientCache); + + waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key); + } + + /** + * @throws Exception If failed. + */ + public void testWaitPrimaryResponse() throws Exception { + checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 1, true, false)); + + checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 2, false, false)); + + checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 2, false, true)); + + checkWaitPrimaryResponse(cacheConfiguration(null, PRIMARY_SYNC, 3, false, false)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void checkWaitPrimaryResponse(CacheConfiguration ccfg) throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache = ignite.createCache(ccfg); + + try { + ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + for (int i = 1; i < NODES; i++) { + Ignite node = ignite(i); + + log.info("Test node: " + node.name()); + + checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + cache.put(key, key); + } + }); + + checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Map map = new HashMap<>(); + + for (int i = 0; i < 50; i++) + map.put(i, i); + + map.put(key, key); + + cache.putAll(map); + } + }); + + for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.put(key, key); + + tx.commit(); + } + } + }); + + checkWaitPrimaryResponse(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Map map = new HashMap<>(); + + for (int i = 0; i < 50; i++) + map.put(i, i); + + map.put(key, key); + + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.putAll(map); + + tx.commit(); + } + } + }); + } + } + } + } + finally { + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param client Node executing cache operation. + * @param ccfg Cache configuration. + * @param c Cache update closure. + * @throws Exception If failed. + */ + private void checkWaitPrimaryResponse( + Ignite client, + final CacheConfiguration ccfg, + final IgniteBiInClosure> c) throws Exception { + Ignite ignite = ignite(0); + + assertNotSame(ignite, client); + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + IgniteCache cache = ignite.cache(ccfg.getName()); + + final Integer key = primaryKey(cache); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + final IgniteCache clientCache = client.cache(ccfg.getName()); + + commSpi0.blockMessages(GridNearTxFinishResponse.class, client.name()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + c.apply(key, clientCache); + + return null; + } + }, "tx-thread"); + + U.sleep(100); + + assertFalse(fut.isDone()); + + commSpi0.stopBlock(true); + + fut.get(); + + waitKeyUpdated(ignite, ccfg.getBackups() + 1, ccfg.getName(), key); + } + + /** + * @throws Exception If failed. + */ + public void testOnePhaseMessages() throws Exception { + checkOnePhaseMessages(cacheConfiguration(null, PRIMARY_SYNC, 1, false, false)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void checkOnePhaseMessages(CacheConfiguration ccfg) throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache = ignite.createCache(ccfg); + + try { + for (int i = 1; i < NODES; i++) { + Ignite node = ignite(i); + + log.info("Test node: " + node.name()); + + checkOnePhaseMessages(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + cache.put(key, key); + } + }); + + for (final TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + checkOnePhaseMessages(node, ccfg, new IgniteBiInClosure>() { + @Override public void apply(Integer key, IgniteCache cache) { + Ignite ignite = cache.unwrap(Ignite.class); + + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { + cache.put(key, key); + + tx.commit(); + } + } + }); + } + } + } + } + finally { + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param client Node executing cache operation. + * @param ccfg Cache configuration. + * @param c Cache update closure. + * @throws Exception If failed. + */ + private void checkOnePhaseMessages( + Ignite client, + final CacheConfiguration ccfg, + final IgniteBiInClosure> c) throws Exception { + Ignite ignite = ignite(0); + + assertNotSame(ignite, client); + + TestRecordingCommunicationSpi commSpiClient = + (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi(); + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + IgniteCache cache = ignite.cache(ccfg.getName()); + + final Integer key = primaryKey(cache); + + cache.remove(key); + + waitKeyRemoved(ccfg.getName(), key); + + final IgniteCache clientCache = client.cache(ccfg.getName()); + + commSpi0.record(GridNearTxFinishResponse.class, GridNearTxPrepareResponse.class); + commSpiClient.record(GridNearTxPrepareRequest.class, GridNearTxFinishRequest.class); + + c.apply(key, clientCache); + + List srvMsgs = commSpi0.recordedMessages(true); + + assertEquals("Unexpected messages: " + srvMsgs, 1, srvMsgs.size()); + assertTrue("Unexpected message: " + srvMsgs.get(0), srvMsgs.get(0) instanceof GridNearTxPrepareResponse); + + List clientMsgs = commSpiClient.recordedMessages(true); + + assertEquals("Unexpected messages: " + clientMsgs, 1, clientMsgs.size()); + assertTrue("Unexpected message: " + clientMsgs.get(0), clientMsgs.get(0) instanceof GridNearTxPrepareRequest); + + GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)clientMsgs.get(0); + + assertTrue(req.onePhaseCommit()); + + for (Ignite ignite0 : G.allGrids()) + assertEquals(key, ignite0.cache(cache.getName()).get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testTxSyncMode() throws Exception { + Ignite ignite = ignite(0); + + List> caches = new ArrayList<>(); + + try { + caches.add(createCache(ignite, cacheConfiguration("fullSync1", FULL_SYNC, 1, false, false), true)); + caches.add(createCache(ignite, cacheConfiguration("fullSync2", FULL_SYNC, 1, false, false), true)); + caches.add(createCache(ignite, cacheConfiguration("fullAsync1", FULL_ASYNC, 1, false, false), true)); + caches.add(createCache(ignite, cacheConfiguration("fullAsync2", FULL_ASYNC, 1, false, false), true)); + caches.add(createCache(ignite, cacheConfiguration("primarySync1", PRIMARY_SYNC, 1, false, false), true)); + caches.add(createCache(ignite, cacheConfiguration("primarySync2", PRIMARY_SYNC, 1, false, false), true)); + + for (int i = 0; i < NODES; i++) { + checkTxSyncMode(ignite(i), true); + checkTxSyncMode(ignite(i), false); + } + } + finally { + for (IgniteCache cache : caches) + ignite.destroyCache(cache.getName()); + } + } + + /** + * @param cacheName Cache name. + * @param key Cache key. + * @throws Exception If failed. + */ + private void waitKeyRemoved(final String cacheName, final Object key) throws Exception { + boolean waitRmv = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Ignite ignite : G.allGrids()) { + if (ignite.cache(cacheName).get(key) != null) + return false; + } + + return true; + } + }, 5000); + + assertTrue(waitRmv); + } + + /** + * @param ignite Node. + * @param expNodes Expected number of cache server nodes. + * @param cacheName Cache name. + * @param key Cache key. + * @throws Exception If failed. + */ + private void waitKeyUpdated(Ignite ignite, int expNodes, final String cacheName, final Object key) throws Exception { + Affinity aff = ignite.affinity(cacheName); + + final Collection nodes = aff.mapKeyToPrimaryAndBackups(key); + + assertEquals(expNodes, nodes.size()); + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (ClusterNode node : nodes) { + Ignite ignite = grid(node); + + if (!key.equals(ignite.cache(cacheName).get(key))) + return false; + } + + return true; + } + }, 5000); + + assertTrue(wait); + + for (Ignite ignite0 : G.allGrids()) + assertEquals(key, ignite0.cache(cacheName).get(key)); + } + + /** + * @param ignite Node. + * @param ccfg Cache configuration. + * @param nearCache If {@code true} creates near cache on one of client nodes. + * @return Created cache. + */ + private IgniteCache createCache(Ignite ignite, CacheConfiguration ccfg, + boolean nearCache) { + IgniteCache cache = ignite.createCache(ccfg); + + if (nearCache) + ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + return cache; + } + + /** + * @param ignite Node. + * @param commit If {@code true} commits transaction. + */ + private void checkTxSyncMode(Ignite ignite, boolean commit) { + IgniteTransactions txs = ignite.transactions(); + + IgniteCache fullSync1 = ignite.cache("fullSync1"); + IgniteCache fullSync2 = ignite.cache("fullSync2"); + IgniteCache fullAsync1 = ignite.cache("fullAsync1"); + IgniteCache fullAsync2 = ignite.cache("fullAsync2"); + IgniteCache primarySync1 = ignite.cache("primarySync1"); + IgniteCache primarySync2 = ignite.cache("primarySync2"); + + for (int i = 0; i < 3; i++) { + int key = 0; + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullSync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + + checkSyncMode(tx, FULL_ASYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + primarySync1.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + for (int j = 0; j < 100; j++) + fullSync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + for (int j = 0; j < 100; j++) + fullAsync1.put(key++, 1); + + checkSyncMode(tx, FULL_ASYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + for (int j = 0; j < 100; j++) + primarySync1.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullSync1.put(key++, 1); + fullSync2.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + fullAsync2.put(key++, 1); + + checkSyncMode(tx, FULL_ASYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + primarySync1.put(key++, 1); + primarySync2.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullSync1.put(key++, 1); + primarySync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + primarySync1.put(key++, 1); + fullSync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullSync1.put(key++, 1); + fullAsync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + fullSync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + primarySync1.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + primarySync1.put(key++, 1); + fullAsync2.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + primarySync1.put(key++, 1); + fullAsync1.put(key++, 1); + + checkSyncMode(tx, PRIMARY_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullSync1.put(key++, 1); + fullAsync1.put(key++, 1); + primarySync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + fullAsync1.put(key++, 1); + primarySync1.put(key++, 1); + fullSync1.put(key++, 1); + + checkSyncMode(tx, FULL_SYNC); + + if (commit) + tx.commit(); + } + } + } + } + } + + /** + * @param tx Transaction. + * @param syncMode Expected write synchronization mode. + */ + private void checkSyncMode(Transaction tx, CacheWriteSynchronizationMode syncMode) { + assertEquals(syncMode, ((TransactionProxyImpl)tx).tx().syncMode()); + } + + /** + * @param name Cache name. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param store If {@code true} configures cache store. + * @param nearCache If {@code true} configures near cache. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean store, + boolean nearCache) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName(name); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + ccfg.setBackups(backups); + + if (store) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java new file mode 100644 index 0000000..08396da --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionOptimisticException; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 2; + + /** */ + private static final int NODES = SRVS + CLIENTS; + + /** */ + private boolean clientMode; + + /** */ + private static final int MULTITHREADED_TEST_KEYS = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(clientMode); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(SRVS); + + clientMode = true; + + for (int i = 0; i < CLIENTS; i++) { + Ignite client = startGrid(SRVS + i); + + assertTrue(client.configuration().isClientMode()); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedPrimarySyncRestart() throws Exception { + multithreadedTests(PRIMARY_SYNC, true); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedPrimarySync() throws Exception { + multithreadedTests(PRIMARY_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedFullSync() throws Exception { + multithreadedTests(FULL_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedFullSyncRestart() throws Exception { + multithreadedTests(FULL_SYNC, true); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedFullAsync() throws Exception { + multithreadedTests(FULL_ASYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedFullAsyncRestart() throws Exception { + multithreadedTests(FULL_ASYNC, true); + } + + /** + * @param syncMode Write synchronization mode. + * @param restart Restart flag. + * @throws Exception If failed. + */ + private void multithreadedTests(CacheWriteSynchronizationMode syncMode, boolean restart) throws Exception { + multithreaded(syncMode, 0, false, false, restart); + + multithreaded(syncMode, 1, false, false, restart); + + multithreaded(syncMode, 1, true, false, restart); + + multithreaded(syncMode, 2, false, false, restart); + } + + /** + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param store If {@code true} sets store in cache configuration. + * @param nearCache If {@code true} creates near cache on one of client nodes. + * @param restart If {@code true} restarts one node during test. + * @throws Exception If failed. + */ + private void multithreaded(CacheWriteSynchronizationMode syncMode, + int backups, + boolean store, + boolean nearCache, + boolean restart) throws Exception { + final Ignite ignite = ignite(0); + + createCache(ignite, cacheConfiguration(null, syncMode, backups, store), nearCache); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture restartFut = null; + + try { + if (restart) { + restartFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stop.get()) { + startGrid(NODES); + + U.sleep(100); + + stopGrid(NODES); + } + return null; + } + }, "restart-thread"); + } + + commitMultithreaded(new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, IgniteCache cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS); + + cache.put(key, rnd.nextInt()); + } + }); + + commitMultithreaded(new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, IgniteCache cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Map map = new TreeMap<>(); + + for (int i = 0; i < 100; i++) { + Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS); + + map.put(key, rnd.nextInt()); + } + + cache.putAll(map); + } + }); + + commitMultithreaded(new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, IgniteCache cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Map map = new TreeMap<>(); + + for (int i = 0; i < 100; i++) { + Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS); + + map.put(key, rnd.nextInt()); + } + + try { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + } + catch (CacheException | IgniteException e) { + // Ignore. + } + } + }); + + commitMultithreaded(new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, IgniteCache cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Map map = new LinkedHashMap<>(); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(MULTITHREADED_TEST_KEYS); + + map.put(key, rnd.nextInt()); + } + + while (true) { + try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + for (Map.Entry e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + + break; + } + catch (TransactionOptimisticException e) { + // Retry. + } + catch (CacheException | IgniteException e) { + break; + } + } + } + }); + } + finally { + stop.set(true); + + ignite.destroyCache(null); + + if (restartFut != null) + restartFut.get(); + } + } + + /** + * @param c Test iteration closure. + * @throws Exception If failed. + */ + public void commitMultithreaded(final IgniteBiInClosure> c) throws Exception { + final long stopTime = System.currentTimeMillis() + 10_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + int nodeIdx = idx % NODES; + + Thread.currentThread().setName("tx-thread-" + nodeIdx); + + Ignite ignite = ignite(nodeIdx); + + IgniteCache cache = ignite.cache(null); + + while (System.currentTimeMillis() < stopTime) + c.apply(ignite, cache); + } + }, NODES * 3, "tx-thread"); + + final IgniteCache cache = ignite(0).cache(null); + + for (int key = 0; key < MULTITHREADED_TEST_KEYS; key++) { + final Integer key0 = key; + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + final Integer val = cache.get(key0); + + for (int i = 1; i < NODES; i++) { + IgniteCache cache = ignite(i).cache(null); + + if (!val.equals(cache.get(key0))) + return false; + } + return true; + } + }, 5000); + + assertTrue(wait); + } + } + + /** + * @param ignite Node. + * @param ccfg Cache configuration. + * @param nearCache If {@code true} creates near cache on one of client nodes. + * @return Created cache. + */ + private IgniteCache createCache(Ignite ignite, CacheConfiguration ccfg, + boolean nearCache) { + IgniteCache cache = ignite.createCache(ccfg); + + if (nearCache) + ignite(NODES - 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + return cache; + } + + /** + * @param name Cache name. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param store If {@code true} configures cache store. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean store) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName(name); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + ccfg.setBackups(backups); + + if (store) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + } + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java index 0666349..277ffaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java @@ -110,9 +110,9 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest TestRecordingCommunicationSpi spi1 = (TestRecordingCommunicationSpi)g1.configuration().getCommunicationSpi(); TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)g2.configuration().getCommunicationSpi(); - info(spi0.recordedMessages().size() + " " + - spi1.recordedMessages().size() + " " + - spi2.recordedMessages().size()); + info(spi0.recordedMessages(false).size() + " " + + spi1.recordedMessages(false).size() + " " + + spi2.recordedMessages(false).size()); checkCache(c0, cnt); checkCache(c1, cnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index a08d080..ad122e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -25,8 +25,6 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -51,6 +49,9 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionRollbackException; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -94,9 +95,10 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { protected CacheConfiguration cacheConfiguration(String gridName) { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setCacheMode(CacheMode.PARTITIONED); - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); return ccfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 7aab990..facdc4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -92,8 +92,11 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest; @@ -311,6 +314,9 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheGetCustomCollectionsSelfTest.class); suite.addTestSuite(IgniteCacheLoadRebalanceEvictionSelfTest.class); + suite.addTestSuite(IgniteCachePrimarySyncTest.class); + suite.addTestSuite(IgniteTxCachePrimarySyncTest.class); + suite.addTestSuite(IgniteTxCacheWriteSynchronizationModesMultithreadedTest.class); return suite; }