ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/28] ignite git commit: IGNITE-264 - Check backup node for one-phase transaction when primary node crashes.
Date Mon, 14 Sep 2015 09:11:15 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
index 80fdbbe..ea7b124 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
@@ -131,7 +131,7 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest
{
 
         checkCache(ignite, CU.MARSH_CACHE_NAME, MARSH_CACHE_POOL, false, false);
 
-        checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, false);
+        checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, true);
 
         for (String cache : userCaches)
             checkCache(ignite, cache, SYSTEM_POOL, true, false);
@@ -157,4 +157,4 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest
{
         assertEquals("Unexpected property for cache: " + cache.name(), user, cache.context().userCache());
         assertEquals("Unexpected property for cache: " + cache.name(), sysTx, cache.context().systemTx());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
new file mode 100644
index 0000000..87c160f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Checks one-phase commit scenarios.
+ */
+public class IgniteOnePhaseCommitNearSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 4;
+
+    /** */
+    private int backups = 1;
+
+    /** */
+    private static Map<Class<?>, AtomicInteger> msgCntMap = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+        cfg.setCommunicationSpi(new MessageCountingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setBackups(backups);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOnePhaseCommitFromNearNode() throws Exception {
+        backups = 1;
+
+        startGrids(GRID_CNT);
+
+        try {
+            awaitPartitionMapExchange();
+
+            int key = generateNearKey();
+
+            IgniteCache<Object, Object> cache = ignite(0).cache(null);
+
+            checkKey(ignite(0).transactions(), cache, key);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param transactions Transactions instance.
+     * @param cache Cache instance.
+     * @param key Key.
+     */
+    private void checkKey(IgniteTransactions transactions, Cache<Object, Object> cache,
int key) throws Exception {
+        cache.put(key, key);
+
+        finalCheck(key, true);
+
+        TransactionIsolation[] isolations = {READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE};
+        TransactionConcurrency[] concurrencies = {OPTIMISTIC, PESSIMISTIC};
+
+        for (TransactionIsolation isolation : isolations) {
+            for (TransactionConcurrency concurrency : concurrencies) {
+                info("Checking transaction [isolation=" + isolation + ", concurrency=" +
concurrency + ']');
+
+                try (Transaction tx = transactions.txStart(concurrency, isolation)) {
+                    cache.put(key, isolation + "-" + concurrency);
+
+                    tx.commit();
+                }
+
+                finalCheck(key, true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void finalCheck(final int key, boolean onePhase) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    for (int i = 0; i < GRID_CNT; i++) {
+                        GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache();
+
+                        GridCacheEntryEx entry = cache.peekEx(key);
+
+                        if (entry != null) {
+                            if (entry.lockedByAny()) {
+                                info("Near entry is still locked [i=" + i + ", entry=" +
entry + ']');
+
+                                return false;
+                            }
+                        }
+
+                        entry = cache.context().near().dht().peekEx(key);
+
+                        if (entry != null) {
+                            if (entry.lockedByAny()) {
+                                info("DHT entry is still locked [i=" + i + ", entry=" + entry
+ ']');
+
+                                return false;
+                            }
+                        }
+                    }
+
+                    return true;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    info("Entry was removed, will retry");
+
+                    return false;
+                }
+            }
+        }, 10_000);
+
+        if (onePhase) {
+            assertMessageCount(GridNearTxPrepareRequest.class, 1);
+            assertMessageCount(GridDhtTxPrepareRequest.class, 1);
+            assertMessageCount(GridNearTxFinishRequest.class, 1);
+            assertMessageCount(GridDhtTxFinishRequest.class, 0);
+
+            msgCntMap.clear();
+        }
+    }
+
+    /**
+     * @param cls Class to check.
+     * @param cnt Expected count.
+     */
+    private void assertMessageCount(Class<?> cls, int cnt) {
+        AtomicInteger val = msgCntMap.get(cls);
+
+        int iVal = val == null ? 0 : val.get();
+
+        assertEquals("Invalid message count for class: " + cls.getSimpleName(), cnt, iVal);
+    }
+
+    /**
+     * @return Key.
+     */
+    protected int generateNearKey() {
+        Affinity<Object> aff = ignite(0).affinity(null);
+
+        int key = 0;
+
+        while (true) {
+            boolean primary = aff.isPrimary(ignite(1).cluster().localNode(), key);
+            boolean primaryOrBackup = aff.isPrimaryOrBackup(ignite(0).cluster().localNode(),
key);
+
+            if (primary && !primaryOrBackup)
+                return key;
+
+            key++;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MessageCountingCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure)
+            throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                GridIoMessage ioMsg = (GridIoMessage)msg;
+
+                Class<?> cls = ioMsg.message().getClass();
+
+                AtomicInteger cntr = msgCntMap.get(cls);
+
+                if (cntr == null)
+                    cntr = F.addIfAbsent(msgCntMap, cls, new AtomicInteger());
+
+                cntr.incrementAndGet();
+            }
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index e712cc9..5bc779c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -98,7 +99,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
         startGrid(1);
         startGrid(2);
 
-        PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+        final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
 
         ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
 
@@ -127,7 +128,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         assert !cache.containsKey(key);
 
-        assert !lsnr.lostParts.isEmpty();
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !lsnr.lostParts.isEmpty();
+            }
+        }, getTestTimeout());
     }
 
     /**
@@ -139,7 +144,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         awaitPartitionMapExchange();
 
-        PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+        final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
 
         ignite(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
 
@@ -157,7 +162,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         assert !jcache(1).containsKey(key);
 
-        assert !lsnr.lostParts.isEmpty();
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !lsnr.lostParts.isEmpty();
+            }
+        }, getTestTimeout());
     }
 
     /**
@@ -172,7 +181,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         startGrid(0);
 
-        PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+        final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
 
         grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
 
@@ -206,7 +215,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         startGrid(1);
 
-        PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+        final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
 
         grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
 
@@ -235,7 +244,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         awaitPartitionMapExchange();
 
-        assert !lsnr.lostParts.isEmpty();
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !lsnr.lostParts.isEmpty();
+            }
+        }, getTestTimeout());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index 27b201f..806d8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -182,7 +182,8 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest
{
 
         startGrids(GRID_CNT);
 
-        awaitPartitionMapExchange();
+        if (cacheMode == REPLICATED)
+            awaitPartitionMapExchange();
 
         ignites = new Ignite[GRID_CNT];
         ids = new UUID[GRID_CNT];
@@ -624,4 +625,4 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest
{
             return null;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
new file mode 100644
index 0000000..1135c40
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import javax.cache.CacheException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests one-phase commit transactions when some of the nodes fail in the middle of the transaction.
+ */
+@SuppressWarnings("unchecked")
+public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
+    /**
+     * @return Grid count.
+     */
+    public int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.setCommunicationSpi(new BanningCommunicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception
{
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception
{
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception
{
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPrimaryNodeFailureBackupCommit(
+        final TransactionConcurrency conc,
+        boolean backup,
+        final boolean commit
+    ) throws Exception {
+        startGrids(gridCount());
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < gridCount(); i++)
+            info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
+
+        try {
+            final Ignite ignite = ignite(0);
+
+            final IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            final int key = generateKey(ignite, backup);
+
+            IgniteEx backupNode = (IgniteEx)backupNode(key, null);
+
+            assertNotNull(backupNode);
+
+            final CountDownLatch commitLatch = new CountDownLatch(1);
+
+            if (!commit) {
+                communication(1).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareRequest.class));
+            }
+            else {
+                if (!backup) {
+                    communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+                    communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+                }
+                else
+                    communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+            }
+
+            IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
+                @Override public Object call() throws Exception {
+                    if (conc != null) {
+                        try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ))
{
+                            cache.put(key, key);
+
+                            Transaction asyncTx = (Transaction)tx.withAsync();
+
+                            asyncTx.commit();
+
+                            commitLatch.countDown();
+
+                            try {
+                                IgniteFuture<Object> fut = asyncTx.future();
+
+                                fut.get();
+
+                                if (!commit) {
+                                    error("Transaction has been committed");
+
+                                    fail("Transaction has been committed: " + tx);
+                                }
+                            }
+                            catch (TransactionRollbackException e) {
+                                if (commit) {
+                                    error(e.getMessage(), e);
+
+                                    fail("Failed to commit: " + e);
+                                }
+                                else
+                                    assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                            }
+                        }
+                    }
+                    else {
+                        IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+                        cache0.put(key, key);
+
+                        Thread.sleep(1000);
+
+                        commitLatch.countDown();
+
+                        try {
+                            cache0.future().get();
+
+                            if (!commit) {
+                                error("Transaction has been committed");
+
+                                fail("Transaction has been committed.");
+                            }
+                        }
+                        catch (CacheException e) {
+                            if (commit) {
+                                error(e.getMessage(), e);
+
+                                fail("Failed to commit: " + e);
+                            }
+                            else
+                                assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                        }
+                    }
+
+                    return null;
+                }
+            });
+
+            commitLatch.await();
+
+            stopGrid(1);
+
+            // Check that thread successfully finished.
+            fut.get();
+
+            // Check there are no hanging transactions.
+            assertEquals(0, ((IgniteEx)ignite(0)).context().cache().context().tm().idMapSize());
+            assertEquals(0, ((IgniteEx)ignite(2)).context().cache().context().tm().idMapSize());
+            assertEquals(0, ((IgniteEx)ignite(3)).context().cache().context().tm().idMapSize());
+
+            dataCheck((IgniteKernal)ignite(0), (IgniteKernal)backupNode, key, commit);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param orig Originating cache.
+     * @param backup Backup cache.
+     * @param key Key being committed and checked.
+     * @param commit Commit or rollback flag.
+     * @throws Exception If check failed.
+     */
+    private void dataCheck(IgniteKernal orig, IgniteKernal backup, int key, boolean commit)
throws Exception {
+        GridNearCacheEntry nearEntry = null;
+
+        GridCacheAdapter origCache = orig.internalCache(null);
+
+        if (origCache.isNear())
+            nearEntry = (GridNearCacheEntry)origCache.peekEx(key);
+
+        GridCacheAdapter backupCache = backup.internalCache(null);
+
+        if (backupCache.isNear())
+            backupCache = backupCache.context().near().dht();
+
+        GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)backupCache.peekEx(key);
+
+        if (commit) {
+            assertNotNull(dhtEntry);
+            assertTrue("dhtEntry=" + dhtEntry, dhtEntry.remoteMvccSnapshot().isEmpty());
+            assertTrue("dhtEntry=" + dhtEntry, dhtEntry.localCandidates().isEmpty());
+            assertEquals(key, backupCache.localPeek(key, null, null));
+
+            if (nearEntry != null) {
+                assertTrue("near=" + nearEntry, nearEntry.remoteMvccSnapshot().isEmpty());
+                assertTrue("near=" + nearEntry, nearEntry.localCandidates().isEmpty());
+
+                // Near peek wil be null since primary node has changed.
+                assertNull("near=" + nearEntry, origCache.localPeek(key, null, null));
+            }
+        }
+        else {
+            assertTrue("near=" + nearEntry + ", hc=" + System.identityHashCode(nearEntry),
nearEntry == null);
+            assertTrue("Invalid backup cache entry: " + dhtEntry,
+                dhtEntry == null || dhtEntry.rawGetOrUnmarshal(false) == null);
+        }
+    }
+
+    /**
+     * @param idx Index.
+     * @return Communication SPI.
+     */
+    private BanningCommunicationSpi communication(int idx) {
+        return (BanningCommunicationSpi)ignite(idx).configuration().getCommunicationSpi();
+    }
+
+    /**
+     * @param ignite Ignite instance to generate key.
+     * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary
for
+     *      {@code ignite(1)}.
+     */
+    private int generateKey(Ignite ignite, boolean backup) {
+        Affinity<Object> aff = ignite.affinity(null);
+
+        for (int key = 0;;key++) {
+            if (backup) {
+                if (!aff.isBackup(ignite(0).cluster().localNode(), key))
+                    continue;
+            }
+            else {
+                if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key))
+                    continue;
+            }
+
+            if (aff.isPrimary(ignite(1).cluster().localNode(), key))
+                return key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class BanningCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private volatile Collection<Class> bannedClasses = Collections.emptyList();
+
+        /**
+         * @param bannedClasses Banned classes.
+         */
+        public void bannedClasses(Collection<Class> bannedClasses) {
+            this.bannedClasses = bannedClasses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure) throws IgniteSpiException {
+            GridIoMessage ioMsg = (GridIoMessage)msg;
+
+            if (!bannedClasses.contains(ioMsg.message().getClass())) {
+                super.sendMessage(node, msg, ackClosure);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
new file mode 100644
index 0000000..5735182
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+/**
+ *
+ */
+public class GridNearCacheTxNodeFailureSelfTest extends GridCacheTxNodeFailureSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) {
+        return super.cacheConfiguration(gridName).setNearConfiguration(new NearCacheConfiguration());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
new file mode 100644
index 0000000..cee54b8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstractTest {
+    /**
+     * Grid count.
+     */
+    private static final int GRID_CNT = 5;
+
+    /**
+     * Restart count.
+     */
+    private static final int RESTART_CNT = 15;
+
+    /**
+     * Atomic long name.
+     */
+    private static final String ATOMIC_LONG_NAME = "test-atomic-long";
+
+    /**
+     * Queue.
+     */
+    private final Queue<Long> queue = new ConcurrentLinkedQueue<>();
+
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        AtomicConfiguration atomicCfg = new AtomicConfiguration();
+        atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+        atomicCfg.setBackups(1);
+
+        cfg.setAtomicConfiguration(atomicCfg);
+
+        return cfg;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        queue.clear();
+    }
+
+    /**
+     *
+     */
+    public void testQueueCreateNodesJoin() throws Exception {
+        CountDownLatch startLatch = new CountDownLatch(GRID_CNT);
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            futs.add(startNodeAndCreaterThread(i, startLatch, run));
+
+        startLatch.await();
+
+        info("All nodes started.");
+
+        Thread.sleep(10_000);
+
+        run.set(false);
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
+
+        info("Increments: " + queue.size());
+
+        assert !queue.isEmpty();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementConsistency() throws Exception {
+        startGrids(GRID_CNT);
+
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+            /** {@inheritDoc} */
+            @Override
+            public Void call() throws Exception {
+                IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+                while (run.get())
+                    queue.add(cntr.getAndIncrement());
+
+                return null;
+            }
+        }, 4, "increment-runner");
+
+        for (int i = 0; i < RESTART_CNT; i++) {
+            int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+            stopGrid(restartIdx);
+
+            U.sleep(500);
+
+            startGrid(restartIdx);
+        }
+
+        run.set(false);
+
+        fut.get();
+
+        info("Increments: " + queue.size());
+
+        checkQueue();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueClose() throws Exception {
+        startGrids(GRID_CNT);
+
+        int threads = 4;
+
+        final AtomicBoolean run = new AtomicBoolean(true);
+        final AtomicInteger idx = new AtomicInteger();
+        final AtomicReferenceArray<Exception> arr = new AtomicReferenceArray<>(threads);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+            /** {@inheritDoc} */
+            @Override
+            public Void call() throws Exception {
+                int base = idx.getAndIncrement();
+
+                try {
+                    int delta = 0;
+
+                    while (run.get()) {
+                        IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME + "-"
+ base + "-" + delta, 0, true);
+
+                        for (int i = 0; i < 5; i++)
+                            queue.add(cntr.getAndIncrement());
+
+                        cntr.close();
+
+                        delta++;
+                    }
+                }
+                catch (Exception e) {
+                    arr.set(base, e);
+
+                    throw e;
+                }
+                finally {
+                    info("RUNNER THREAD IS STOPPING");
+                }
+
+                return null;
+            }
+        }, threads, "increment-runner");
+
+        for (int i = 0; i < RESTART_CNT; i++) {
+            int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+            stopGrid(restartIdx);
+
+            U.sleep(500);
+
+            startGrid(restartIdx);
+        }
+
+        run.set(false);
+
+        fut.get();
+
+        for (int i = 0; i < threads; i++) {
+            Exception err = arr.get(i);
+
+            if (err != null)
+                throw err;
+        }
+    }
+
+    /**
+     *
+     */
+    private void checkQueue() {
+        List<Long> list = new ArrayList<>(queue);
+
+        Collections.sort(list);
+
+        boolean failed = false;
+
+        int delta = 0;
+
+        for (int i = 0; i < list.size(); i++) {
+            Long exp = (long)(i + delta);
+
+            Long actual = list.get(i);
+
+            if (!exp.equals(actual)) {
+                failed = true;
+
+                delta++;
+
+                info(">>> Expected " + exp + ", actual " + actual);
+            }
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     * @param i Node index.
+     */
+    private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch
startLatch, final AtomicBoolean run)
+        throws Exception {
+        return multithreadedAsync(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Ignite ignite = startGrid(i);
+
+                    startLatch.countDown();
+
+                    while (run.get()) {
+                        IgniteAtomicLong cntr = ignite.atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+                        queue.add(cntr.getAndIncrement());
+                    }
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 1, "grunner-" + i);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
index db55731..1d80ac1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
@@ -90,11 +92,14 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest
{
         IgniteCache<Integer, Integer> cache0 = ignite(0).cache(null);
         IgniteCache<Integer, Integer> cache1 = ignite1.cache(null);
 
+        Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
         for (int i = 0; i < 5; i++) {
             log.info("Iteration: " + i);
 
-            GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
+            futs.add(GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
                     int val = idx.getAndIncrement();
 
                     IgniteCache<Integer, Integer> cache = ignite1.cache(null);
@@ -104,10 +109,13 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest
{
 
                     return null;
                 }
-            }, 5, "put-thread");
+            }, 5, "put-thread"));
 
             assertEquals(cache0.localPeek(key), cache1.localPeek(key));
         }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 28f9a65..eaeb7b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -30,6 +30,9 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxNode
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtAtomicRemoveFailureTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtClientRemoveFailureTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtRemoveFailureTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest;
@@ -97,7 +100,11 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
 
         suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+        suite.addTestSuite(IgniteAtomicLongChangingTopologySelfTest.class);
+
+        suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class);
+        suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class);
 
         return suite;
     }
-}
\ No newline at end of file
+}


Mime
View raw message