Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D06E718345 for ; Mon, 14 Sep 2015 09:11:10 +0000 (UTC) Received: (qmail 90900 invoked by uid 500); 14 Sep 2015 09:11:10 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 90808 invoked by uid 500); 14 Sep 2015 09:11:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 90679 invoked by uid 99); 14 Sep 2015 09:11:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2015 09:11:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 709D1DFC88; Mon, 14 Sep 2015 09:11:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 14 Sep 2015 09:11:15 -0000 Message-Id: <24ebcf3e70f54899bb97c532c7b4b5f7@git.apache.org> In-Reply-To: <93660438cc884caa8c543c487ce83d04@git.apache.org> References: <93660438cc884caa8c543c487ce83d04@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/28] ignite git commit: IGNITE-264 - Check backup node for one-phase transaction when primary node crashes. http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java index 80fdbbe..ea7b124 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java @@ -131,7 +131,7 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest { checkCache(ignite, CU.MARSH_CACHE_NAME, MARSH_CACHE_POOL, false, false); - checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, false); + checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, true); for (String cache : userCaches) checkCache(ignite, cache, SYSTEM_POOL, true, false); @@ -157,4 +157,4 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest { assertEquals("Unexpected property for cache: " + cache.name(), user, cache.context().userCache()); assertEquals("Unexpected property for cache: " + cache.name(), sysTx, cache.context().systemTx()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java new file mode 100644 index 0000000..87c160f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Checks one-phase commit scenarios. + */ +public class IgniteOnePhaseCommitNearSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 4; + + /** */ + private int backups = 1; + + /** */ + private static Map, AtomicInteger> msgCntMap = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.getTransactionConfiguration().setTxSerializableEnabled(true); + + cfg.setCommunicationSpi(new MessageCountingCommunicationSpi()); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String gridName) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(backups); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testOnePhaseCommitFromNearNode() throws Exception { + backups = 1; + + startGrids(GRID_CNT); + + try { + awaitPartitionMapExchange(); + + int key = generateNearKey(); + + IgniteCache cache = ignite(0).cache(null); + + checkKey(ignite(0).transactions(), cache, key); + } + finally { + stopAllGrids(); + } + } + + /** + * @param transactions Transactions instance. + * @param cache Cache instance. + * @param key Key. + */ + private void checkKey(IgniteTransactions transactions, Cache cache, int key) throws Exception { + cache.put(key, key); + + finalCheck(key, true); + + TransactionIsolation[] isolations = {READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE}; + TransactionConcurrency[] concurrencies = {OPTIMISTIC, PESSIMISTIC}; + + for (TransactionIsolation isolation : isolations) { + for (TransactionConcurrency concurrency : concurrencies) { + info("Checking transaction [isolation=" + isolation + ", concurrency=" + concurrency + ']'); + + try (Transaction tx = transactions.txStart(concurrency, isolation)) { + cache.put(key, isolation + "-" + concurrency); + + tx.commit(); + } + + finalCheck(key, true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void finalCheck(final int key, boolean onePhase) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + for (int i = 0; i < GRID_CNT; i++) { + GridCacheAdapter cache = ((IgniteKernal)ignite(i)).internalCache(); + + GridCacheEntryEx entry = cache.peekEx(key); + + if (entry != null) { + if (entry.lockedByAny()) { + info("Near entry is still locked [i=" + i + ", entry=" + entry + ']'); + + return false; + } + } + + entry = cache.context().near().dht().peekEx(key); + + if (entry != null) { + if (entry.lockedByAny()) { + info("DHT entry is still locked [i=" + i + ", entry=" + entry + ']'); + + return false; + } + } + } + + return true; + } + catch (GridCacheEntryRemovedException ignore) { + info("Entry was removed, will retry"); + + return false; + } + } + }, 10_000); + + if (onePhase) { + assertMessageCount(GridNearTxPrepareRequest.class, 1); + assertMessageCount(GridDhtTxPrepareRequest.class, 1); + assertMessageCount(GridNearTxFinishRequest.class, 1); + assertMessageCount(GridDhtTxFinishRequest.class, 0); + + msgCntMap.clear(); + } + } + + /** + * @param cls Class to check. + * @param cnt Expected count. + */ + private void assertMessageCount(Class cls, int cnt) { + AtomicInteger val = msgCntMap.get(cls); + + int iVal = val == null ? 0 : val.get(); + + assertEquals("Invalid message count for class: " + cls.getSimpleName(), cnt, iVal); + } + + /** + * @return Key. + */ + protected int generateNearKey() { + Affinity aff = ignite(0).affinity(null); + + int key = 0; + + while (true) { + boolean primary = aff.isPrimary(ignite(1).cluster().localNode(), key); + boolean primaryOrBackup = aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key); + + if (primary && !primaryOrBackup) + return key; + + key++; + } + } + + /** + * + */ + private static class MessageCountingCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) + throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + GridIoMessage ioMsg = (GridIoMessage)msg; + + Class cls = ioMsg.message().getClass(); + + AtomicInteger cntr = msgCntMap.get(cls); + + if (cntr == null) + cntr = F.addIfAbsent(msgCntMap, cls, new AtomicInteger()); + + cntr.incrementAndGet(); + } + + super.sendMessage(node, msg, ackClosure); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java index e712cc9..5bc779c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -98,7 +99,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract startGrid(1); startGrid(2); - PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); @@ -127,7 +128,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract assert !cache.containsKey(key); - assert !lsnr.lostParts.isEmpty(); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !lsnr.lostParts.isEmpty(); + } + }, getTestTimeout()); } /** @@ -139,7 +144,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract awaitPartitionMapExchange(); - PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); ignite(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); @@ -157,7 +162,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract assert !jcache(1).containsKey(key); - assert !lsnr.lostParts.isEmpty(); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !lsnr.lostParts.isEmpty(); + } + }, getTestTimeout()); } /** @@ -172,7 +181,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract startGrid(0); - PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); @@ -206,7 +215,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract startGrid(1); - PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); @@ -235,7 +244,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract awaitPartitionMapExchange(); - assert !lsnr.lostParts.isEmpty(); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !lsnr.lostParts.isEmpty(); + } + }, getTestTimeout()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java index 27b201f..806d8b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java @@ -182,7 +182,8 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { startGrids(GRID_CNT); - awaitPartitionMapExchange(); + if (cacheMode == REPLICATED) + awaitPartitionMapExchange(); ignites = new Ignite[GRID_CNT]; ids = new UUID[GRID_CNT]; @@ -624,4 +625,4 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { return null; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java new file mode 100644 index 0000000..1135c40 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionRollbackException; + +import javax.cache.CacheException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests one-phase commit transactions when some of the nodes fail in the middle of the transaction. + */ +@SuppressWarnings("unchecked") +public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { + /** + * @return Grid count. + */ + public int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.setCommunicationSpi(new BanningCommunicationSpi()); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String gridName) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setBackups(1); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, true, false); + } + + /** + * @throws Exception If failed. + */ + private void checkPrimaryNodeFailureBackupCommit( + final TransactionConcurrency conc, + boolean backup, + final boolean commit + ) throws Exception { + startGrids(gridCount()); + awaitPartitionMapExchange(); + + for (int i = 0; i < gridCount(); i++) + info("Grid " + i + ": " + ignite(i).cluster().localNode().id()); + + try { + final Ignite ignite = ignite(0); + + final IgniteCache cache = ignite.cache(null); + + final int key = generateKey(ignite, backup); + + IgniteEx backupNode = (IgniteEx)backupNode(key, null); + + assertNotNull(backupNode); + + final CountDownLatch commitLatch = new CountDownLatch(1); + + if (!commit) { + communication(1).bannedClasses(Collections.singletonList(GridDhtTxPrepareRequest.class)); + } + else { + if (!backup) { + communication(2).bannedClasses(Collections.singletonList(GridDhtTxPrepareResponse.class)); + communication(3).bannedClasses(Collections.singletonList(GridDhtTxPrepareResponse.class)); + } + else + communication(0).bannedClasses(Collections.singletonList(GridDhtTxPrepareResponse.class)); + } + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + if (conc != null) { + try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) { + cache.put(key, key); + + Transaction asyncTx = (Transaction)tx.withAsync(); + + asyncTx.commit(); + + commitLatch.countDown(); + + try { + IgniteFuture fut = asyncTx.future(); + + fut.get(); + + if (!commit) { + error("Transaction has been committed"); + + fail("Transaction has been committed: " + tx); + } + } + catch (TransactionRollbackException e) { + if (commit) { + error(e.getMessage(), e); + + fail("Failed to commit: " + e); + } + else + assertTrue(X.hasCause(e, TransactionRollbackException.class)); + } + } + } + else { + IgniteCache cache0 = cache.withAsync(); + + cache0.put(key, key); + + Thread.sleep(1000); + + commitLatch.countDown(); + + try { + cache0.future().get(); + + if (!commit) { + error("Transaction has been committed"); + + fail("Transaction has been committed."); + } + } + catch (CacheException e) { + if (commit) { + error(e.getMessage(), e); + + fail("Failed to commit: " + e); + } + else + assertTrue(X.hasCause(e, TransactionRollbackException.class)); + } + } + + return null; + } + }); + + commitLatch.await(); + + stopGrid(1); + + // Check that thread successfully finished. + fut.get(); + + // Check there are no hanging transactions. + assertEquals(0, ((IgniteEx)ignite(0)).context().cache().context().tm().idMapSize()); + assertEquals(0, ((IgniteEx)ignite(2)).context().cache().context().tm().idMapSize()); + assertEquals(0, ((IgniteEx)ignite(3)).context().cache().context().tm().idMapSize()); + + dataCheck((IgniteKernal)ignite(0), (IgniteKernal)backupNode, key, commit); + } + finally { + stopAllGrids(); + } + } + + /** + * @param orig Originating cache. + * @param backup Backup cache. + * @param key Key being committed and checked. + * @param commit Commit or rollback flag. + * @throws Exception If check failed. + */ + private void dataCheck(IgniteKernal orig, IgniteKernal backup, int key, boolean commit) throws Exception { + GridNearCacheEntry nearEntry = null; + + GridCacheAdapter origCache = orig.internalCache(null); + + if (origCache.isNear()) + nearEntry = (GridNearCacheEntry)origCache.peekEx(key); + + GridCacheAdapter backupCache = backup.internalCache(null); + + if (backupCache.isNear()) + backupCache = backupCache.context().near().dht(); + + GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)backupCache.peekEx(key); + + if (commit) { + assertNotNull(dhtEntry); + assertTrue("dhtEntry=" + dhtEntry, dhtEntry.remoteMvccSnapshot().isEmpty()); + assertTrue("dhtEntry=" + dhtEntry, dhtEntry.localCandidates().isEmpty()); + assertEquals(key, backupCache.localPeek(key, null, null)); + + if (nearEntry != null) { + assertTrue("near=" + nearEntry, nearEntry.remoteMvccSnapshot().isEmpty()); + assertTrue("near=" + nearEntry, nearEntry.localCandidates().isEmpty()); + + // Near peek wil be null since primary node has changed. + assertNull("near=" + nearEntry, origCache.localPeek(key, null, null)); + } + } + else { + assertTrue("near=" + nearEntry + ", hc=" + System.identityHashCode(nearEntry), nearEntry == null); + assertTrue("Invalid backup cache entry: " + dhtEntry, + dhtEntry == null || dhtEntry.rawGetOrUnmarshal(false) == null); + } + } + + /** + * @param idx Index. + * @return Communication SPI. + */ + private BanningCommunicationSpi communication(int idx) { + return (BanningCommunicationSpi)ignite(idx).configuration().getCommunicationSpi(); + } + + /** + * @param ignite Ignite instance to generate key. + * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for + * {@code ignite(1)}. + */ + private int generateKey(Ignite ignite, boolean backup) { + Affinity aff = ignite.affinity(null); + + for (int key = 0;;key++) { + if (backup) { + if (!aff.isBackup(ignite(0).cluster().localNode(), key)) + continue; + } + else { + if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key)) + continue; + } + + if (aff.isPrimary(ignite(1).cluster().localNode(), key)) + return key; + } + } + + /** + * + */ + private static class BanningCommunicationSpi extends TcpCommunicationSpi { + /** */ + private volatile Collection bannedClasses = Collections.emptyList(); + + /** + * @param bannedClasses Banned classes. + */ + public void bannedClasses(Collection bannedClasses) { + this.bannedClasses = bannedClasses; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) throws IgniteSpiException { + GridIoMessage ioMsg = (GridIoMessage)msg; + + if (!bannedClasses.contains(ioMsg.message().getClass())) { + super.sendMessage(node, msg, ackClosure); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java new file mode 100644 index 0000000..5735182 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class GridNearCacheTxNodeFailureSelfTest extends GridCacheTxNodeFailureSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) { + return super.cacheConfiguration(gridName).setNearConfiguration(new NearCacheConfiguration()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java new file mode 100644 index 0000000..cee54b8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstractTest { + /** + * Grid count. + */ + private static final int GRID_CNT = 5; + + /** + * Restart count. + */ + private static final int RESTART_CNT = 15; + + /** + * Atomic long name. + */ + private static final String ATOMIC_LONG_NAME = "test-atomic-long"; + + /** + * Queue. + */ + private final Queue queue = new ConcurrentLinkedQueue<>(); + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * {@inheritDoc} + */ + @Override + protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + AtomicConfiguration atomicCfg = new AtomicConfiguration(); + atomicCfg.setCacheMode(CacheMode.PARTITIONED); + atomicCfg.setBackups(1); + + cfg.setAtomicConfiguration(atomicCfg); + + return cfg; + } + + /** + * {@inheritDoc} + */ + @Override + protected void afterTest() throws Exception { + stopAllGrids(); + + queue.clear(); + } + + /** + * + */ + public void testQueueCreateNodesJoin() throws Exception { + CountDownLatch startLatch = new CountDownLatch(GRID_CNT); + final AtomicBoolean run = new AtomicBoolean(true); + + Collection> futs = new ArrayList<>(); + + for (int i = 0; i < GRID_CNT; i++) + futs.add(startNodeAndCreaterThread(i, startLatch, run)); + + startLatch.await(); + + info("All nodes started."); + + Thread.sleep(10_000); + + run.set(false); + + for (IgniteInternalFuture fut : futs) + fut.get(); + + info("Increments: " + queue.size()); + + assert !queue.isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testIncrementConsistency() throws Exception { + startGrids(GRID_CNT); + + final AtomicBoolean run = new AtomicBoolean(true); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + /** {@inheritDoc} */ + @Override + public Void call() throws Exception { + IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME, 0, true); + + while (run.get()) + queue.add(cntr.getAndIncrement()); + + return null; + } + }, 4, "increment-runner"); + + for (int i = 0; i < RESTART_CNT; i++) { + int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1; + + stopGrid(restartIdx); + + U.sleep(500); + + startGrid(restartIdx); + } + + run.set(false); + + fut.get(); + + info("Increments: " + queue.size()); + + checkQueue(); + } + + /** + * @throws Exception If failed. + */ + public void testQueueClose() throws Exception { + startGrids(GRID_CNT); + + int threads = 4; + + final AtomicBoolean run = new AtomicBoolean(true); + final AtomicInteger idx = new AtomicInteger(); + final AtomicReferenceArray arr = new AtomicReferenceArray<>(threads); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + /** {@inheritDoc} */ + @Override + public Void call() throws Exception { + int base = idx.getAndIncrement(); + + try { + int delta = 0; + + while (run.get()) { + IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME + "-" + base + "-" + delta, 0, true); + + for (int i = 0; i < 5; i++) + queue.add(cntr.getAndIncrement()); + + cntr.close(); + + delta++; + } + } + catch (Exception e) { + arr.set(base, e); + + throw e; + } + finally { + info("RUNNER THREAD IS STOPPING"); + } + + return null; + } + }, threads, "increment-runner"); + + for (int i = 0; i < RESTART_CNT; i++) { + int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1; + + stopGrid(restartIdx); + + U.sleep(500); + + startGrid(restartIdx); + } + + run.set(false); + + fut.get(); + + for (int i = 0; i < threads; i++) { + Exception err = arr.get(i); + + if (err != null) + throw err; + } + } + + /** + * + */ + private void checkQueue() { + List list = new ArrayList<>(queue); + + Collections.sort(list); + + boolean failed = false; + + int delta = 0; + + for (int i = 0; i < list.size(); i++) { + Long exp = (long)(i + delta); + + Long actual = list.get(i); + + if (!exp.equals(actual)) { + failed = true; + + delta++; + + info(">>> Expected " + exp + ", actual " + actual); + } + } + + assertFalse(failed); + } + + /** + * @param i Node index. + */ + private IgniteInternalFuture startNodeAndCreaterThread(final int i, final CountDownLatch startLatch, final AtomicBoolean run) + throws Exception { + return multithreadedAsync(new Runnable() { + @Override + public void run() { + try { + Ignite ignite = startGrid(i); + + startLatch.countDown(); + + while (run.get()) { + IgniteAtomicLong cntr = ignite.atomicLong(ATOMIC_LONG_NAME, 0, true); + + queue.add(cntr.getAndIncrement()); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }, 1, "grunner-" + i); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java index db55731..1d80ac1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; @@ -90,11 +92,14 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest { IgniteCache cache0 = ignite(0).cache(null); IgniteCache cache1 = ignite1.cache(null); + Collection> futs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { log.info("Iteration: " + i); - GridTestUtils.runMultiThreadedAsync(new Callable() { - @Override public Object call() throws Exception { + futs.add(GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override + public Object call() throws Exception { int val = idx.getAndIncrement(); IgniteCache cache = ignite1.cache(null); @@ -104,10 +109,13 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest { return null; } - }, 5, "put-thread"); + }, 5, "put-thread")); assertEquals(cache0.localPeek(key), cache1.localPeek(key)); } + + for (IgniteInternalFuture fut : futs) + fut.get(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index 28f9a65..eaeb7b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -30,6 +30,9 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxNode import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtAtomicRemoveFailureTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtClientRemoveFailureTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtRemoveFailureTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest; @@ -97,7 +100,11 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSizeFailoverTest.class); suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class); + suite.addTestSuite(IgniteAtomicLongChangingTopologySelfTest.class); + + suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class); + suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class); return suite; } -} \ No newline at end of file +}