ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [26/36] ignite git commit: ignite-5041 NPE in deadlock detection fixed
Date Thu, 27 Apr 2017 07:13:42 GMT
ignite-5041 NPE in deadlock detection fixed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/746f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/746f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/746f8ebc

Branch: refs/heads/ignite-5094
Commit: 746f8ebc2e56720b50873af1b4ee2a320ca58793
Parents: 8f9edeb
Author: agura <agura@apache.org>
Authored: Thu Apr 20 20:45:58 2017 +0300
Committer: agura <agura@apache.org>
Committed: Wed Apr 26 18:48:39 2017 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           |  24 ++
 .../cache/GridCacheSharedContext.java           |  18 +-
 .../cache/transactions/IgniteTxManager.java     |  20 +-
 .../cache/transactions/TxDeadlock.java          |  19 +-
 .../cache/transactions/TxLocksResponse.java     |  37 +--
 ...DeadlockDetectionMessageMarshallingTest.java | 116 ++++++++++
 .../TxDeadlockDetectionUnmasrhalErrorsTest.java | 225 +++++++++++++++++++
 .../TxDeadlockDetectionTestSuite.java           |   4 +
 8 files changed, 435 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 92a7af3..09b4c3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
@@ -83,6 +85,12 @@ public class DynamicCacheDescriptor {
     /** */
     private AffinityTopologyVersion rcvdFromVer;
 
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Cached object context for marshalling issues when cache isn't started. */
+    private volatile CacheObjectContext objCtx;
+
     /** */
     private transient AffinityTopologyVersion clientCacheStartVer;
 
@@ -228,6 +236,22 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * Creates and caches cache object context if needed.
+     *
+     * @param proc Object processor.
+     */
+    public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws
IgniteCheckedException {
+        if (objCtx == null) {
+            synchronized (mux) {
+                if (objCtx == null)
+                    objCtx = proc.contextForCache(cacheCfg);
+            }
+        }
+
+        return objCtx;
+    }
+
+    /**
      * @return Cache plugin manager.
      */
     public CachePluginManager pluginManager() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 79083e0..55f3c42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -22,8 +22,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import org.apache.ignite.IgniteCheckedException;
@@ -448,6 +446,22 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * Returns cache object context if created or creates new and caches it until cache started.
+     *
+     * @param cacheId Cache id.
+     */
+    public @Nullable CacheObjectContext cacheObjectContext(int cacheId) throws IgniteCheckedException
{
+        GridCacheContext<K, V> ctx = ctxMap.get(cacheId);
+
+        if (ctx != null)
+            return ctx.cacheObjectContext();
+
+        DynamicCacheDescriptor desc = cache().cacheDescriptor(cacheId);
+
+        return desc != null ? desc.cacheObjectContext(kernalContext().cacheObjects()) : null;
+    }
+
+    /**
      * @return Ignite instance name.
      */
     public String igniteInstanceName() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 2da8dee..db0395f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2409,11 +2409,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
         @Override public void onMessage(UUID nodeId, Object msg) {
             GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
-            unmarshall(nodeId, cacheMsg);
+            Throwable err = null;
 
-            if (cacheMsg.classError() != null) {
+            try {
+                unmarshall(nodeId, cacheMsg);
+            }
+            catch (Exception e) {
+                err = e;
+            }
+
+            if (err != null || cacheMsg.classError() != null) {
                 try {
-                    processFailedMessage(nodeId, cacheMsg);
+                    processFailedMessage(nodeId, cacheMsg, err);
                 }
                 catch(Throwable e){
                     U.error(log, "Failed to process message [senderId=" + nodeId +
@@ -2466,7 +2473,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
          * @param nodeId Node ID.
          * @param msg Message.
          */
-        private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException
{
+        private void processFailedMessage(UUID nodeId, GridCacheMessage msg, Throwable err)
throws IgniteCheckedException {
             switch (msg.directType()) {
                 case -24: {
                     TxLocksRequest req = (TxLocksRequest)msg;
@@ -2498,7 +2505,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
                         return;
                     }
 
-                    fut.onResult(nodeId, res);
+                    if (err == null)
+                        fut.onResult(nodeId, res);
+                    else
+                        fut.onDone(null, err);
                 }
 
                 break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
index f8130e1..97db698 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
@@ -21,11 +21,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -133,11 +132,21 @@ public class TxDeadlock {
         for (Map.Entry<IgniteTxKey, String> e : keyLabels.entrySet()) {
             IgniteTxKey txKey = e.getKey();
 
-            GridCacheContext cctx = ctx.cacheContext(txKey.cacheId());
+            try {
+                CacheObjectContext objCtx = ctx.cacheObjectContext(txKey.cacheId());
 
-            Object val = CU.value(txKey.key(), cctx, true);
+                Object val = txKey.key().value(objCtx, true);
 
-            sb.append(e.getValue()).append(" [key=").append(val).append(", cache=").append(cctx.namexx()).append("]\n");
+                sb.append(e.getValue())
+                    .append(" [key=")
+                    .append(val)
+                    .append(", cache=")
+                    .append(objCtx.cacheName())
+                    .append("]\n");
+            }
+            catch (Exception ex) {
+                sb.append("Unable to unmarshall deadlock information for key [key=").append(e.getValue()).append("]\n");
+            }
         }
 
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
index b7ca832..7856eaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
@@ -181,31 +181,36 @@ public class TxLocksResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws
IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
+        try {
+            super.finishUnmarshal(ctx, ldr);
 
-        if (nearTxKeysArr != null) {
-            for (int i = 0; i < nearTxKeysArr.length; i++) {
-                IgniteTxKey key = nearTxKeysArr[i];
+            if (nearTxKeysArr != null) {
+                for (int i = 0; i < nearTxKeysArr.length; i++) {
+                    IgniteTxKey txKey = nearTxKeysArr[i];
 
-                key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+                    txKey.key().finishUnmarshal(ctx.cacheObjectContext(txKey.cacheId()),
ldr);
 
-                txLocks().put(key, locksArr[i]);
+                    txLocks().put(txKey, locksArr[i]);
+                }
+
+                nearTxKeysArr = null;
+                locksArr = null;
             }
 
-            nearTxKeysArr = null;
-            locksArr = null;
-        }
+            if (txKeysArr != null) {
+                txKeys = U.newHashSet(txKeysArr.length);
 
-        if (txKeysArr != null) {
-            txKeys = U.newHashSet(txKeysArr.length);
+                for (IgniteTxKey txKey : txKeysArr) {
+                    txKey.key().finishUnmarshal(ctx.cacheObjectContext(txKey.cacheId()),
ldr);
 
-            for (IgniteTxKey key : txKeysArr) {
-                key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+                    txKeys.add(txKey);
+                }
 
-                txKeys.add(key);
+                txKeysArr = null;
             }
-
-            txKeysArr = null;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
new file mode 100644
index 0000000..dd7c3b3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstractTest {
+    /** Topic. */
+    private static final String TOPIC = "mytopic";
+
+    /** Client mode. */
+    private static boolean clientMode;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(clientMode);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageUnmarshallWithoutCacheContext() throws Exception {
+        try {
+            Ignite ignite = startGrid(0);
+
+            CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+            IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(ccfg);
+
+            clientMode = true;
+
+            Ignite client = startGrid(1);
+
+            final GridCacheSharedContext<Object, Object> clientCtx = ((IgniteKernal)client).context().cache().context();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final AtomicBoolean res = new AtomicBoolean();
+
+            clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (msg instanceof TxLocksResponse) {
+                        try {
+                            ((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader());
+
+                            res.set(true);
+                        }
+                        catch (Exception e) {
+                            log.error("Message unmarshal failed", e);
+                        }
+                        finally {
+                            latch.countDown();
+                        }
+                    }
+                }
+            });
+
+            GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+            KeyCacheObject key = cctx.toCacheKeyObject(1);
+
+            TxLocksResponse msg = new TxLocksResponse();
+            msg.addKey(cctx.txKey(key));
+
+            msg.prepareMarshal(cctx.shared());
+
+            ((IgniteKernal)ignite).context().cache().context().gridIO().sendToCustomTopic(
+                ((IgniteKernal)client).localNode(), TOPIC, msg, GridIoPolicy.PUBLIC_POOL);
+
+            boolean await = latch.await(1, TimeUnit.SECONDS);
+
+            assertTrue(await && res.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionUnmasrhalErrorsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionUnmasrhalErrorsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionUnmasrhalErrorsTest.java
new file mode 100644
index 0000000..598725e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionUnmasrhalErrorsTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxDeadlockDetectionUnmasrhalErrorsTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 2;
+
+    /** Client. */
+    private static boolean client;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(client);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+
+        client = true;
+
+        startGrid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlockCacheObjectContext() throws Exception {
+        IgniteCache<Integer, Integer> cache0 = null;
+        IgniteCache<Integer, Integer> cache1 = null;
+        try {
+            cache0 = getCache(ignite(0), "cache0");
+            cache1 = getCache(ignite(0), "cache1");
+
+            IgniteCache<Integer, Integer> clientCache0 = grid(1).cache("cache0");
+
+            awaitPartitionMapExchange();
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final AtomicInteger threadCnt = new AtomicInteger();
+
+            final AtomicBoolean deadlock = new AtomicBoolean();
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+                @Override public void run() {
+                    int threadNum = threadCnt.getAndIncrement();
+
+                    Ignite ignite = ignite(0);
+
+                    IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum
== 0 ? 0 : 1));
+
+                    IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum
== 0 ? 1 : 0));
+
+                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
1000, 0)) {
+                        int key1 = threadNum == 0 ? 0 : 1;
+
+                        log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName()
+ ']');
+
+                        cache1.put(key1, 0);
+
+                        barrier.await();
+
+                        int key2 = threadNum == 0 ? 1 : 0;
+
+                        log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode()
+
+                            ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName()
+ ']');
+
+                        latch.countDown();
+
+                        cache2.put(key2, 1);
+
+                        tx.commit();
+
+                        log.info(">>> Commit done");
+                    }
+                    catch (Throwable e) {
+                        // At least one stack trace should contain TransactionDeadlockException.
+                        if (hasCause(e, TransactionTimeoutException.class) &&
+                            hasCause(e, TransactionDeadlockException.class)
+                            ) {
+                            if (deadlock.compareAndSet(false, true))
+                                U.error(log, "At least one stack trace should contain " +
+                                    TransactionDeadlockException.class.getSimpleName(), e);
+                        }
+                    }
+                }
+            }, 2, "tx-thread");
+
+            latch.await();
+
+            Ignite client = grid(1);
+
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED,
500, 0)) {
+                clientCache0.put(0, 3);
+                clientCache0.put(1, 3);
+
+                tx.commit();
+
+                log.info(">>> Commit done");
+            }
+            catch (CacheException e) {
+                assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+            }
+            catch (Throwable e) {
+                log.error("Unexpected exception occurred", e);
+
+                fail();
+            }
+
+            fut.get();
+
+            assertTrue(deadlock.get());
+
+            for (int i = 0; i < NODES_CNT ; i++) {
+                Ignite ignite = ignite(i);
+
+                IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+                Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+                assertTrue(futs.isEmpty());
+            }
+
+            //assertNotNull(grid(1).context().cache().context().cacheContext(cacheId));
+        }
+        finally {
+            if (cache0 != null)
+                cache0.destroy();
+
+            if (cache1 != null)
+                cache1.destroy();
+        }
+    }
+
+
+
+    /**
+     * @param ignite Ignite.
+     * @param name Name.
+     */
+    private IgniteCache<Integer, Integer> getCache(Ignite ignite, String name) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(0);
+        ccfg.setNearConfiguration(null);
+
+        return ignite.getOrCreateCache(ccfg);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/746f8ebc/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
index 5a1b1ad..337d4d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/TxDeadlockDetectionTestSuite.java
@@ -19,8 +19,10 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.transactions.DepthFirstSearchTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionMessageMarshallingTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionNoHangsTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetectionUnmasrhalErrorsTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticDeadlockDetectionCrossCacheTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticDeadlockDetectionTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxPessimisticDeadlockDetectionCrossCacheTest;
@@ -44,6 +46,8 @@ public class TxDeadlockDetectionTestSuite extends TestSuite {
         suite.addTestSuite(TxPessimisticDeadlockDetectionCrossCacheTest.class);
         suite.addTestSuite(TxDeadlockDetectionTest.class);
         suite.addTestSuite(TxDeadlockDetectionNoHangsTest.class);
+        suite.addTestSuite(TxDeadlockDetectionUnmasrhalErrorsTest.class);
+        suite.addTestSuite(TxDeadlockDetectionMessageMarshallingTest.class);
 
         return suite;
     }


Mime
View raw message