ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [01/12] incubator-ignite git commit: IGNITE-674 - Merging 6.6.4 fixes to Ignite.
Date Fri, 10 Apr 2015 06:46:41 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-3 9b03ae109 -> 9e79a2d08


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 10146a4..125e4cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -60,7 +60,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     private static final long serialVersionUID = 0L;
 
     /** Per-transaction read map. */
-    @GridToStringExclude
+    @GridToStringInclude
     protected Map<IgniteTxKey, IgniteTxEntry> txMap;
 
     /** Read view on transaction map. */
@@ -2646,14 +2646,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 if (fut.isDone()) {
                     try {
-                        return plc1.apply(fut.get(), null);
+                        return nonInterruptable(plc1.apply(fut.get(), null));
                     }
                     catch (GridClosureException e) {
                         return new GridFinishedFuture<>(e.unwrap());
                     }
                     catch (IgniteCheckedException e) {
                         try {
-                            return plc1.apply(false, e);
+                            return nonInterruptable(plc1.apply(false, e));
                         }
                         catch (Exception e1) {
                             return new GridFinishedFuture<>(e1);
@@ -2661,10 +2661,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     }
                 }
                 else
-                    return new GridEmbeddedFuture<>(
+                    return nonInterruptable(new GridEmbeddedFuture<>(
                         fut,
                         plc1
-                    );
+                    ));
             }
             else {
                 if (implicit()) {
@@ -2679,22 +2679,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         return new GridFinishedFuture<>(e);
                     }
 
-                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>,
GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx>
txFut) throws IgniteCheckedException {
+                    return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>,
GridCacheReturn>() {
+                        @Override
+                        public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx>
txFut) throws IgniteCheckedException {
                             txFut.get();
 
                             return implicitRes;
                         }
-                    });
+                    }));
                 }
                 else
-                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>,
GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>>
f) throws IgniteCheckedException {
+                    return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>,
GridCacheReturn>() {
+                        @Override
+                        public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>>
f) throws IgniteCheckedException {
                             f.get();
 
                             return ret;
                         }
-                    });
+                    }));
             }
         }
         catch (RuntimeException e) {
@@ -2866,14 +2868,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 if (fut.isDone()) {
                     try {
-                        return plc1.apply(fut.get(), null);
+                        return nonInterruptable(plc1.apply(fut.get(), null));
                     }
                     catch (GridClosureException e) {
                         return new GridFinishedFuture<>(e.unwrap());
                     }
                     catch (IgniteCheckedException e) {
                         try {
-                            return plc1.apply(false, e);
+                            return nonInterruptable(plc1.apply(false, e));
                         }
                         catch (Exception e1) {
                             return new GridFinishedFuture<>(e1);
@@ -2881,10 +2883,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     }
                 }
                 else
-                    return new GridEmbeddedFuture<>(
+                    return nonInterruptable(new GridEmbeddedFuture<>(
                         fut,
                         plc1
-                    );
+                    ));
             }
             else {
                 if (implicit()) {
@@ -2892,26 +2894,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     // with prepare response, if required.
                     assert loadFut.isDone();
 
-                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>,
GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx>
txFut)
-                            throws IgniteCheckedException
-                        {
+                    return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>,
GridCacheReturn>() {
+                        @Override
+                        public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx>
txFut)
+                            throws IgniteCheckedException {
                             txFut.get();
 
                             return (GridCacheReturn)implicitRes;
                         }
-                    });
+                    }));
                 }
                 else
-                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>,
GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>>
f)
-                            throws IgniteCheckedException
-                        {
+                    return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>,
GridCacheReturn>() {
+                        @Override
+                        public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>>
f)
+                            throws IgniteCheckedException {
                             f.get();
 
                             return ret;
                         }
-                    });
+                    }));
             }
         }
         catch (IgniteCheckedException e) {
@@ -2922,6 +2924,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param fut Future.
+     * @return Future ignoring interrupts on {@code get()}.
+     */
+    private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T>
fut) {
+        // Safety.
+        if (fut instanceof GridFutureAdapter)
+            ((GridFutureAdapter)fut).ignoreInterrupts(true);
+
+        return fut;
+    }
+
+    /**
      * Checks if portable values should be deserialized.
      *
      * @param cacheCtx Cache context.
@@ -3264,7 +3278,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             if (explicitCand != null) {
                 GridCacheVersion explicitVer = explicitCand.version();
 
-                if (!explicitVer.equals(xidVer) && explicitCand.threadId() == threadId
&& !explicitCand.tx()) {
+                boolean locCand = false;
+
+                if (explicitCand.nearLocal())
+                    locCand = cctx.localNodeId().equals(explicitCand.nodeId());
+                else if (explicitCand.dhtLocal())
+                    locCand = cctx.localNodeId().equals(explicitCand.otherNodeId());
+
+                if (!explicitVer.equals(xidVer) && explicitCand.threadId() == threadId
&& !explicitCand.tx() && locCand) {
                     txEntry.explicitVersion(explicitVer);
 
                     if (explicitVer.isLess(minVer))
@@ -3538,7 +3559,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 if (!locked)
                     throw new GridClosureException(new IgniteTxTimeoutCheckedException("Failed
to acquire lock " +
-                        "within provided timeout for transaction [timeout=" + timeout() +
", tx=" + this + ']'));
+                        "within provided timeout for transaction [timeout=" + timeout() +
+                        ", tx=" + IgniteTxLocalAdapter.this + ']'));
 
                 IgniteInternalFuture<T> fut = postLock();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a8ff280..153c010 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1016,6 +1016,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * @return Collection of active transactions.
+     */
+    public Collection<IgniteInternalTx> activeTransactions() {
+        return F.concat(false, idMap.values(), nearIdMap.values());
+    }
+
+    /**
      * @param xidVer Completed transaction version.
      * @param nearXidVer Optional near transaction ID.
      * @return If transaction was not already present in completed set.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index dcd6b9b..a8a545d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -42,6 +42,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
     private final ConcurrentLinkedDeque8<IgniteInternalFuture<T>> futs = new
ConcurrentLinkedDeque8<>();
 
     /** Pending futures. */
+    @GridToStringInclude
     private final Collection<IgniteInternalFuture<T>> pending = new ConcurrentLinkedDeque8<>();
 
     /** Listener call count. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f671a77..09cf63a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -64,6 +64,9 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     private volatile long endTime;
 
     /** */
+    private boolean ignoreInterrupts;
+
+    /** */
     private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
 
     /** {@inheritDoc} */
@@ -79,6 +82,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     }
 
     /**
+     * @param ignoreInterrupts Ignore interrupts flag.
+     */
+    public void ignoreInterrupts(boolean ignoreInterrupts) {
+        this.ignoreInterrupts = ignoreInterrupts;
+    }
+
+    /**
      * @return Future end time.
      */
     public long endTime() {
@@ -98,8 +108,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     /** {@inheritDoc} */
     @Override public R get() throws IgniteCheckedException {
         try {
-            if (endTime == 0)
-                acquireSharedInterruptibly(0);
+            if (endTime == 0) {
+                if (ignoreInterrupts)
+                    acquireShared(0);
+                else
+                    acquireSharedInterruptibly(0);
+            }
 
             if (getState() == CANCELLED)
                 throw new IgniteFutureCancelledCheckedException("Future was cancelled: "
+ this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index bed0c8e..1dc682b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -119,7 +120,10 @@ public abstract class GridWorker implements Runnable {
         // Catch everything to make sure that it gets logged properly and
         // not to kill any threads from the underlying thread pool.
         catch (Throwable e) {
-            U.error(log, "Runtime error caught during grid runnable execution: " + this,
e);
+            if (!X.hasCause(e, InterruptedException.class) && !X.hasCause(e, IgniteInterruptedCheckedException.class)
&& !X.hasCause(e, IgniteInterruptedException.class))
+                U.error(log, "Runtime error caught during grid runnable execution: " + this,
e);
+            else
+                U.warn(log, "Runtime exception occurred during grid runnable execution caused
by thread interruption: " + e.getMessage());
         }
         finally {
             synchronized (mux) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheLockFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheLockFailoverSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheLockFailoverSelfTest.java
new file mode 100644
index 0000000..9f9ec60
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheLockFailoverSelfTest.java
@@ -0,0 +1,148 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheLockFailoverSelfTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGridsMultiThreaded(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception
{
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setNearConfiguration(null);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 2 * 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockFailover() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+
+        Integer key = backupKey(cache);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Object>()
{
+            @Override
+            public Object call() throws Exception {
+                while (!stop.get()) {
+                    stopGrid(1);
+
+                    U.sleep(500);
+
+                    startGrid(1);
+                }
+                return null;
+            }
+        });
+
+        try {
+            long end = System.currentTimeMillis() + 60_000;
+
+            long iter = 0;
+
+            while (System.currentTimeMillis() < end) {
+                if (iter % 100 == 0)
+                    log.info("Iteration: " + iter);
+
+                iter++;
+
+                GridCacheAdapter<Object, Object> adapter = ((IgniteKernal)grid(0)).internalCache(null);
+
+                IgniteInternalFuture<Boolean> fut = adapter.lockAsync(key, 0);
+
+                try {
+                    fut.get(30_000);
+
+                    U.sleep(1);
+                }
+                catch (IgniteFutureTimeoutException e) {
+                    info("Entry: " + adapter.peekEx(key));
+
+                    fail("Lock timeout [fut=" + fut + ", err=" + e + ']');
+                }
+                catch (Exception e) {
+                    log.error("Error: " + e);
+                }
+                finally {
+                    adapter.unlock(key);
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+
+            restartFut.get();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnlockPrimaryLeft() throws Exception {
+        GridCacheAdapter<Integer, Integer> cache = ((IgniteKernal)grid(0)).internalCache(null);
+
+        Integer key = backupKey(grid(0).cache(null));
+
+        cache.lock(key, 0);
+
+        stopGrid(1);
+
+        cache.unlock(key);
+
+        GridCacheEntryEx entry = cache.peekEx(key);
+
+        assertTrue("Remote MVCC is not empty: " + entry, entry == null || entry.remoteMvccSnapshot().isEmpty());
+
+        startGrid(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
new file mode 100644
index 0000000..9aa89be
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
@@ -0,0 +1,217 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Tests explicit lock.
+ */
+public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "part_cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private volatile boolean run = true;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        assertEquals(0, G.allGrids().size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setDefaultTimeToLive(120000);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+        ccfg.setBackups(2);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setStartSize(100000);
+        ccfg.setEvictionPolicy(new LruEvictionPolicy(100000));
+        ccfg.setEvictSynchronized(true);
+
+        c.setCacheConfiguration(ccfg);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitLockOneKey() throws Exception {
+        checkExplicitLock(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitLockManyKeys() throws Exception {
+        checkExplicitLock(4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void checkExplicitLock(int keys) throws Exception {
+        Collection<Thread> threads = new ArrayList<>();
+
+        try {
+            // Start grid 1.
+            IgniteEx grid1 = startGrid(1);
+
+            threads.add(runCacheOperations(grid1.cachex(CACHE_NAME), keys));
+
+            TimeUnit.SECONDS.sleep(3L);
+
+            // Start grid 2.
+            IgniteEx grid2 = startGrid(2);
+
+            threads.add(runCacheOperations(grid2.cachex(CACHE_NAME), keys));
+
+            TimeUnit.SECONDS.sleep(3L);
+
+            // Start grid 3.
+            IgniteEx grid3 = startGrid(3);
+
+            threads.add(runCacheOperations(grid3.cachex(CACHE_NAME), keys));
+
+            TimeUnit.SECONDS.sleep(3L);
+
+            // Start grid 4.
+            IgniteEx grid4 = startGrid(4);
+
+            threads.add(runCacheOperations(grid4.cachex(CACHE_NAME), keys));
+
+            TimeUnit.SECONDS.sleep(3L);
+
+            stopThreads(threads);
+
+            for (int i = 1; i <= 4; i++) {
+                IgniteTxManager tm = ((IgniteKernal)grid(i)).internalCache(CACHE_NAME).context().tm();
+
+                assertEquals("txMap is not empty:" + i, 0, tm.idMapSize());
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param threads Thread which will be stopped.
+     */
+    private void stopThreads(Iterable<Thread> threads) {
+        try {
+            run = false;
+
+            for (Thread thread : threads)
+                thread.join();
+        }
+        catch (Exception e) {
+            U.error(log(), "Couldn't stop threads.", e);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Running thread.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    private Thread runCacheOperations(final GridCache<Object,Object> cache, final int
keys) {
+        Thread t = new Thread() {
+            @Override public void run() {
+                while (run) {
+                    TreeMap<Integer, String> vals = generateValues(keys);
+
+                    try {
+                        // Explicit lock.
+                        U.debug(log, "Will lock key: " + vals.firstKey());
+
+                        cache.lock(vals.firstKey(), 0);
+
+                        try {
+                            U.debug(log, "Will run cache op: " + vals);
+
+                            // Put or remove.
+                            if (ThreadLocalRandom.current().nextDouble(1) < 0.65)
+                                cache.putAll(vals);
+                            else
+                                cache.removeAll(vals.keySet());
+                        }
+                        catch (Exception e) {
+                            U.error(log(), "Failed cache operation.", e);
+                        }
+                        finally {
+                            U.debug(log, "Will unlock key: " + vals.firstKey());
+                            cache.unlock(vals.firstKey());
+                        }
+
+                        U.sleep(100);
+                    }
+                    catch (Exception e){
+                        U.error(log(), "Failed unlock.", e);
+                    }
+                }
+            }
+        };
+
+        t.start();
+
+        return t;
+    }
+
+    /**
+     * @param cnt Number of keys to generate.
+     * @return Map.
+     */
+    private TreeMap<Integer, String> generateValues(int cnt) {
+        TreeMap<Integer, String> res = new TreeMap<>();
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        while (res.size() < cnt) {
+            int key = rnd.nextInt(0, 100);
+
+            res.put(key, String.valueOf(key));
+        }
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index da316fd..5fe49ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -621,8 +621,8 @@ public abstract class GridAbstractTest extends TestCase {
      * @return Started grid.
      * @throws Exception If anything failed.
      */
-    protected Ignite startGrid(int idx) throws Exception {
-        return startGrid(getTestGridName(idx));
+    protected IgniteEx startGrid(int idx) throws Exception {
+        return (IgniteEx)startGrid(getTestGridName(idx));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index a4553c4..61fc898 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -426,6 +426,9 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheTxStoreValueTest.class);
         suite.addTestSuite(IgniteCacheTxNearEnabledStoreValueTest.class);
 
+        suite.addTestSuite(IgniteCacheLockFailoverSelfTest.class);
+        suite.addTestSuite(IgniteCacheMultiTxLockSelfTest.class);
+
         return suite;
     }
 }


Mime
View raw message