ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [07/10] ignite git commit: ignite-2854 Deadlock detection for pessimistic transactions
Date Wed, 04 May 2016 14:18:12 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f2b7532/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
new file mode 100644
index 0000000..87539b2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.jsr166.ThreadLocalRandom8;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 3;
+
+    /** Cache. */
+    private static final String CACHE = "cache";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(CACHE);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setNearConfiguration(null);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoHangs() throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> restartFut = null;
+
+        try {
+            restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    while (!stop.get()) {
+                        try {
+                            U.sleep(500);
+
+                            startGrid(NODES_CNT);
+
+                            awaitPartitionMapExchange();
+
+                            U.sleep(500);
+
+                            stopGrid(NODES_CNT);
+                        }
+                        catch (Exception e) {
+                            // No-op.
+                        }
+                    }
+                }
+            }, 1, "restart-thread");
+
+            long stopTime = System.currentTimeMillis() + 2 * 60_000L;
+
+            for (int i = 0; System.currentTimeMillis() < stopTime; i++) {
+                log.info(">>> Iteration " + i);
+
+                final AtomicInteger threadCnt = new AtomicInteger();
+
+                IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+                    @Override public void run() {
+                        int threadNum = threadCnt.getAndIncrement();
+
+                        Ignite ignite = ignite(threadNum % NODES_CNT);
+
+                        IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+                        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC,
REPEATABLE_READ, 500, 0)) {
+                            ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+                            for (int i = 0; i < 50; i++) {
+                                int key = rnd.nextInt(50);
+
+                                if (log.isDebugEnabled()) {
+                                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                                        ", tx=" + tx + ", key=" + key + ']');
+                                }
+
+                                cache.put(key, 0);
+                            }
+
+                            tx.commit();
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }, NODES_CNT * 3, "tx-thread");
+
+                fut.get();
+            }
+        }
+        finally {
+            stop.set(true);
+
+            if (restartFut != null)
+                restartFut.get();
+
+            checkDetectionFuts();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoDeadlockSimple() throws Exception {
+        final AtomicInteger threadCnt = new AtomicInteger();
+
+        final AtomicBoolean deadlock = new AtomicBoolean();
+
+        final AtomicBoolean timedOut = new AtomicBoolean();
+
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        final int timeout = 500;
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable()
{
+            @Override public void run() {
+                int threadNum = threadCnt.getAndIncrement();
+
+                Ignite ignite = ignite(threadNum);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+                try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
timeout, 0)) {
+                    int key = 42;
+
+                    if (log.isDebugEnabled())
+                        log.debug(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key + ']');
+
+                    cache.put(key, 0);
+
+                    barrier.await(timeout + 100, TimeUnit.MILLISECONDS);
+
+                    tx.commit();
+                }
+                catch (Exception e) {
+                    if (hasCause(e, TransactionTimeoutException.class))
+                        timedOut.set(true);
+
+                    if (hasCause(e, TransactionDeadlockException.class))
+                        deadlock.set(true);
+                }
+            }
+        }, 2, "tx-thread");
+
+        fut.get();
+
+        assertTrue(timedOut.get());
+
+        assertFalse(deadlock.get());
+
+        checkDetectionFuts();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoDeadlock() throws Exception {
+        for (int i = 2; i <= 10; i++) {
+            final int threads = i;
+
+            log.info(">>> Test with " + threads + " transactions.");
+
+            final AtomicInteger threadCnt = new AtomicInteger();
+
+            final AtomicBoolean deadlock = new AtomicBoolean();
+
+            final AtomicBoolean timedOut = new AtomicBoolean();
+
+            final CyclicBarrier barrier = new CyclicBarrier(threads);
+
+            final int timeout = 500;
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+                @Override public void run() {
+                    int threadNum = threadCnt.incrementAndGet();
+
+                    Ignite ignite = ignite(threadNum % NODES_CNT);
+
+                    IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
timeout, 0)) {
+                        int key1 = threadNum;
+
+                        log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key1 + ']');
+
+                        cache.put(key1, 0);
+
+                        barrier.await();
+
+                        if (threadNum == threads) {
+                            log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode()
+
+                                ", tx=" + tx + ']');
+
+                            U.sleep(timeout * 2);
+                        }
+                        else {
+                            int key2 = threadNum + 1;
+
+                            log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                                ", tx=" + tx + ", key2=" + key2 + ']');
+
+                            cache.put(key2, 1);
+                        }
+
+                        tx.commit();
+                    }
+                    catch (Exception e) {
+                        if (hasCause(e, TransactionTimeoutException.class))
+                            timedOut.set(true);
+
+                        if (hasCause(e, TransactionDeadlockException.class))
+                            deadlock.set(true);
+                    }
+                }
+            }, threads, "tx-thread");
+
+            fut.get();
+
+            assertTrue(timedOut.get());
+
+            assertFalse(deadlock.get());
+
+            checkDetectionFuts();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailedTxLocksRequest() throws Exception {
+        doTestFailedMessage(TxLocksRequest.class);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailedTxLocksResponse() throws Exception {
+        doTestFailedMessage(TxLocksResponse.class);
+    }
+
+    /**
+     * @param failCls Failing message class.
+     * @throws Exception If failed.
+     */
+    private void doTestFailedMessage(Class failCls) throws Exception {
+        try {
+            final int txCnt = 2;
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            final AtomicInteger threadCnt = new AtomicInteger();
+
+            final AtomicBoolean deadlock = new AtomicBoolean();
+
+            final AtomicBoolean timeout = new AtomicBoolean();
+
+            TestCommunicationSpi.failCls = failCls;
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+                @Override public void run() {
+                    int num = threadCnt.getAndIncrement();
+
+                    Ignite ignite = ignite(num);
+
+                    IgniteCache<Object, Integer> cache = ignite.cache(CACHE);
+
+                    try (Transaction tx =
+                             ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
num == 0 ? 500 : 1500, 0)
+                    ) {
+                        int key1 = primaryKey(ignite((num + 1) % txCnt).cache(CACHE));
+
+                        log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key1 + ']');
+
+                        cache.put(new TestKey(key1), 1);
+
+                        barrier.await();
+
+                        int key2 = primaryKey(cache);
+
+                        log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key2 + ']');
+
+                        cache.put(new TestKey(key2), 2);
+
+                        tx.commit();
+                    }
+                    catch (Exception e) {
+                        timeout.compareAndSet(false, hasCause(e, TransactionTimeoutException.class));
+
+                        deadlock.compareAndSet(false, hasCause(e, TransactionDeadlockException.class));
+                    }
+                }
+            }, 2, "tx-thread");
+
+            fut.get();
+
+            assertFalse(deadlock.get());
+
+            assertTrue(timeout.get());
+
+            checkDetectionFuts();
+        }
+        finally {
+            TestCommunicationSpi.failCls = null;
+            TestKey.failSer = false;
+        }
+    }
+
+    /**
+     *
+     */
+    private void checkDetectionFuts() {
+        for (int i = 0; i < NODES_CNT ; i++) {
+            Ignite ignite = ignite(i);
+
+            IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+            ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs =
+                GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts");
+
+            assertTrue(futs.isEmpty());
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestKey implements Externalizable {
+        /** Fail request. */
+        private static volatile boolean failSer = false;
+
+        /** Id. */
+        private int id;
+
+        /**
+         * Default constructor (required by Externalizable).
+         */
+        public TestKey() {
+            // No-op.
+        }
+
+        /**
+         * @param id Id.
+         */
+        public TestKey(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            if (failSer) {
+                TestCommunicationSpi.failCls = null;
+                failSer = false;
+
+                throw new IOException();
+            }
+
+            id = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey key = (TestKey)o;
+
+            return id == key.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Fail response. */
+        private static volatile Class failCls;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            ClusterNode node,
+            Message msg,
+            IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (failCls != null && msg instanceof GridIoMessage &&
+                ((GridIoMessage)msg).message().getClass() == failCls)
+                TestKey.failSer = true;
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f2b7532/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java
new file mode 100644
index 0000000..abbefd0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionCrossCacheTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxPessimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest
{
+    /** Nodes count. */
+    private static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        CacheConfiguration ccfg0 = defaultCacheConfiguration();
+
+        ccfg0.setName("cache0");
+        ccfg0.setCacheMode(CacheMode.PARTITIONED);
+        ccfg0.setBackups(1);
+        ccfg0.setNearConfiguration(null);
+
+        CacheConfiguration ccfg1 = defaultCacheConfiguration();
+
+        ccfg1.setName("cache1");
+        ccfg1.setCacheMode(CacheMode.PARTITIONED);
+        ccfg1.setBackups(1);
+        ccfg1.setNearConfiguration(null);
+
+        cfg.setCacheConfiguration(ccfg0, ccfg1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlock() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        final AtomicInteger threadCnt = new AtomicInteger();
+
+        final AtomicBoolean deadlock = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable()
{
+            @Override public void run() {
+                int threadNum = threadCnt.getAndIncrement();
+
+                Ignite ignite = ignite(0);
+
+                IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum
== 0 ? 0 : 1));
+
+                IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum
== 0 ? 1 : 0));
+
+                try (Transaction tx =
+                         ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500,
0)
+                ) {
+                    int key1 = primaryKey(cache1);
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                        ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName() +
']');
+
+                    cache1.put(key1, 0);
+
+                    barrier.await();
+
+                    int key2 = primaryKey(cache2);
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                        ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName() +
']');
+
+                    cache2.put(key2, 1);
+
+                    tx.commit();
+                }
+                catch (Throwable e) {
+                    // At least one stack trace should contain TransactionDeadlockException.
+                    if (hasCause(e, TransactionTimeoutException.class) &&
+                        hasCause(e, TransactionDeadlockException.class)
+                        ) {
+                        if (deadlock.compareAndSet(false, true))
+                            U.error(log, "At least one stack trace should contain " +
+                                TransactionDeadlockException.class.getSimpleName(), e);
+                    }
+                }
+            }
+        }, 2, "tx-thread");
+
+        fut.get();
+
+        assertTrue(deadlock.get());
+
+        for (int i = 0; i < NODES_CNT ; i++) {
+            Ignite ignite = ignite(i);
+
+            IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+            ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs =
+                GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts");
+
+            assertTrue(futs.isEmpty());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f2b7532/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
new file mode 100644
index 0000000..ee1a989
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.util.typedef.X.cause;
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests deadlock detection for pessimistic transactions.
+ */
+public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Nodes count (actually two times more nodes will started: server + client). */
+    private static final int NODES_CNT = 4;
+
+    /** No op transformer. */
+    private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer();
+
+    /** Wrapping transformer. */
+    private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer();
+
+    /** Client mode flag. */
+    private static boolean client;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = false;
+
+        startGrids(NODES_CNT);
+
+        client = true;
+
+        for (int i = 0; i < NODES_CNT; i++)
+            startGrid(i + NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksPartitioned() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values())
{
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testDeadlocksPartitionedNear() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values())
{
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksReplicated() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values())
{
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksLocal() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values())
{
+            IgniteCache cache = null;
+
+            try {
+                cache = createCache(LOCAL, syncMode, false);
+
+                awaitPartitionMapExchange();
+
+                doTestDeadlock(2, true, true, false, NO_OP_TRANSFORMER);
+                doTestDeadlock(2, true, true, false, WRAPPING_TRANSFORMER);
+            }
+            finally {
+                if (cache != null)
+                    cache.destroy();
+            }
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write sync mode.
+     * @param near Near.
+     */
+    @SuppressWarnings("unchecked")
+    private IgniteCache createCache(CacheMode cacheMode, CacheWriteSynchronizationMode syncMode,
boolean near) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setBackups(1);
+        ccfg.setNearConfiguration(near ? new NearCacheConfiguration() : null);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        return ignite(0).getOrCreateCache(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object>
transformer) throws Exception {
+        try {
+            awaitPartitionMapExchange();
+
+            doTestDeadlock(2, false, true, true, transformer);
+            doTestDeadlock(2, false, false, false, transformer);
+            doTestDeadlock(2, false, false, true, transformer);
+
+            doTestDeadlock(3, false, true, true, transformer);
+            doTestDeadlock(3, false, false, false, transformer);
+            doTestDeadlock(3, false, false, true, transformer);
+
+            doTestDeadlock(4, false, true, true, transformer);
+            doTestDeadlock(4, false, false, false, transformer);
+            doTestDeadlock(4, false, false, true, transformer);
+        }
+        catch (Exception e) {
+            U.error(log, "Unexpected exception: ", e);
+
+            fail();
+        }
+        finally {
+            if (cache != null)
+                cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestDeadlock(
+        final int txCnt,
+        final boolean loc,
+        boolean lockPrimaryFirst,
+        final boolean clientTx,
+        final IgniteClosure<Integer, Object> transformer
+    ) throws Exception {
+        log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst="
+ lockPrimaryFirst +
+            ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName()
+ ']');
+
+        final AtomicInteger threadCnt = new AtomicInteger();
+
+        final CyclicBarrier barrier = new CyclicBarrier(txCnt);
+
+        final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>();
+
+        final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst);
+
+        final Set<Integer> involvedKeys = new GridConcurrentHashSet<>();
+        final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>();
+        final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable()
{
+            @Override public void run() {
+                int threadNum = threadCnt.incrementAndGet();
+
+                Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt
: threadNum - 1);
+
+                IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME);
+
+                List<Integer> keys = keySets.get(threadNum - 1);
+
+                int txTimeout = 500 + txCnt * 100;
+
+                try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
txTimeout, 0)) {
+                    involvedTxs.add(((TransactionProxyImpl)tx).tx());
+
+                    Integer key = keys.get(0);
+
+                    involvedKeys.add(key);
+
+                    Object k;
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                        ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+
+                    cache.put(transformer.apply(key), 0);
+
+                    involvedLockedKeys.add(key);
+
+                    barrier.await();
+
+                    key = keys.get(1);
+
+                    ClusterNode primaryNode =
+                        ((IgniteCacheProxy)cache).context().affinity().primary(key, NONE);
+
+                    List<Integer> primaryKeys =
+                        primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 *
threadNum));
+
+                    Map<Object, Integer> entries = new HashMap<>();
+
+                    involvedKeys.add(key);
+
+                    entries.put(transformer.apply(key), 0);
+
+                    for (Integer i : primaryKeys) {
+                        involvedKeys.add(i);
+
+                        entries.put(transformer.apply(i), 1);
+
+                        k = transformer.apply(i + 13);
+
+                        involvedKeys.add(i + 13);
+
+                        entries.put(k, 2);
+                    }
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                        ", tx=" + tx + ", entries=" + entries + ']');
+
+                    cache.putAll(entries);
+
+                    tx.commit();
+                }
+                catch (Throwable e) {
+                    // At least one stack trace should contain TransactionDeadlockException.
+                    if (hasCause(e, TransactionTimeoutException.class) &&
+                        hasCause(e, TransactionDeadlockException.class)
+                        ) {
+                        if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class)))
+                            U.error(log, "At least one stack trace should contain " +
+                                TransactionDeadlockException.class.getSimpleName(), e);
+                    }
+                }
+            }
+        }, loc ? 2 : txCnt, "tx-thread");
+
+        fut.get();
+
+        U.sleep(1000);
+
+        TransactionDeadlockException deadlockE = deadlockErr.get();
+
+        assertNotNull(deadlockE);
+
+        // Check transactions, futures and entry locks state.
+        for (int i = 0; i < NODES_CNT * 2; i++) {
+            Ignite ignite = ignite(i);
+
+            int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId();
+
+            IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+            Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
+
+            for (IgniteInternalTx tx : activeTxs) {
+                Collection<IgniteTxEntry> entries = tx.allEntries();
+
+                for (IgniteTxEntry entry : entries) {
+                    if (entry.cacheId() == cacheId)
+                        fail("Transaction still exists: " + tx);
+                }
+            }
+
+            ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs =
+                GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts");
+
+            assertTrue(futs.isEmpty());
+
+            GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME);
+
+            GridCacheConcurrentMap map = intCache.map();
+
+            for (Integer key : involvedKeys) {
+                Object key0 = transformer.apply(key);
+
+                KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
+
+                GridCacheMapEntry entry = map.getEntry(keyCacheObj);
+
+                if (entry != null)
+                    assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
+            }
+        }
+
+        // Check deadlock report
+        String msg = deadlockE.getMessage();
+
+        for (IgniteInternalTx tx : involvedTxs)
+            assertTrue(msg.contains(
+                "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" +
tx.threadId() + ']'));
+
+        for (Integer key : involvedKeys) {
+            if (involvedLockedKeys.contains(key))
+                assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME
+ ']'));
+            else
+                assertFalse(msg.contains("[key=" + transformer.apply(key)));
+        }
+    }
+
+    /**
+     * @param nodesCnt Nodes count.
+     * @param loc Local cache.
+     */
+    private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean
reverse) throws IgniteCheckedException {
+        List<List<Integer>> keySets = new ArrayList<>();
+
+        if (loc) {
+            List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2);
+
+            keySets.add(new ArrayList<>(keys));
+
+            Collections.reverse(keys);
+
+            keySets.add(keys);
+        }
+        else {
+            for (int i = 0; i < nodesCnt; i++) {
+                List<Integer> keys = new ArrayList<>(2);
+
+                keys.add(primaryKey(ignite(i).cache(CACHE_NAME)));
+                keys.add(primaryKey(ignite(i == nodesCnt - 1 ? 0 : i + 1).cache(CACHE_NAME)));
+
+                if (reverse)
+                    Collections.reverse(keys);
+
+                keySets.add(keys);
+            }
+        }
+
+        return keySets;
+    }
+
+    /**
+     *
+     */
+    private static class NoOpTransformer implements IgniteClosure<Integer, Object>
{
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer val) {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class WrappingTransformer implements IgniteClosure<Integer, Object>
{
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer val) {
+            return new KeyObject(val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class KeyObject implements Serializable {
+        /** Id. */
+        private int id;
+
+        /** Name. */
+        private String name;
+
+        /**
+         * @param id Id.
+         */
+        public KeyObject(int id) {
+            this.id = id;
+            this.name = "KeyObject" + id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "KeyObject{" +
+                "id=" + id +
+                ", name='" + name + '\'' +
+                '}';
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            KeyObject obj = (KeyObject)o;
+
+            return id == obj.id && name.equals(obj.name);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f2b7532/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java
new file mode 100644
index 0000000..45afba2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/BinaryObjectsTxDeadlockDetectionTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.testframework.config.GridTestProperties;
+
+/**
+ *
+ */
+public class BinaryObjectsTxDeadlockDetectionTestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+
+        return TxDeadlockDetectionTestSuite.suite();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f2b7532/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
new file mode 100644
index 0000000..c057e55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.transactions.DepthFirstSearchTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxPessimisticDeadlockDetectionCrossCacheTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxPessimisticDeadlockDetectionTest;
+
+/**
+ * Deadlock detection related tests.
+ */
+public class TxDeadlockDetectionTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Deadlock Detection Test Suite");
+
+        suite.addTestSuite(DepthFirstSearchTest.class);
+        suite.addTestSuite(TxPessimisticDeadlockDetectionTest.class);
+        suite.addTestSuite(TxPessimisticDeadlockDetectionCrossCacheTest.class);
+        suite.addTestSuite(TxDeadlockDetectionTest.class);
+
+        return suite;
+    }
+}


Mime
View raw message