ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [49/56] [abbrv] incubator-ignite git commit: # ignite-63
Date Fri, 23 Jan 2015 09:37:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java
new file mode 100644
index 0000000..2fa5e5d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java
@@ -0,0 +1,1328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Test for group locking.
+ */
+public abstract class GridCacheGroupLockAbstractSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Event wait timeout. */
+    private static final int WAIT_TIMEOUT = 3000;
+
+    /** */
+    private TestStore store;
+
+    /** @return Grid count to run in test. */
+    protected int gridCount() {
+        return 1;
+    }
+
+    /** @return Whether near cache is enabled. */
+    protected abstract boolean nearEnabled();
+
+    /** @return Cache mode for test. */
+    protected abstract GridCacheMode cacheMode();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        cacheCfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() {
+            @Override public CacheStore<? super Object, ? super Object> create() {
+                return store;
+            }
+        });
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+        cacheCfg.setLoadPreviousValue(true);
+
+        cfg.setCacheConfiguration(cacheCfg);
+        cfg.setCacheSanityCheckEnabled(sanityCheckEnabled());
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        store = new TestStore();
+
+        startGridsMultiThreaded(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        store = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockPutOneKeyOptimistic() throws Exception {
+        checkGroupLockPutOneKey(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockPutOneKeyPessimistic() throws Exception {
+        checkGroupLockPutOneKey(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockPutOneKey(IgniteTxConcurrency concurrency) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        GridCacheAffinityKey<String> key1;
+        GridCacheAffinityKey<String> key2;
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+            key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+            cache.putAll(F.asMap(
+                key1, "val1",
+                key2, "val2")
+            );
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockRemoveOneKeyOptimistic() throws Exception {
+        checkGroupLockRemoveOneKey(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockRemoveOneKeyPessimistic() throws Exception {
+        checkGroupLockRemoveOneKey(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockRemoveOneKey(IgniteTxConcurrency concurrency) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        // Populate cache.
+        cache.putAll(F.asMap(
+            key1, "val1",
+            key2, "val2")
+        );
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+
+            cache.removeAll(F.asList(key1, key2));
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertNull("For index: " + i, gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertNull("For index: " + i, gCache.peek(key2));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockGetOneKeyOptimistic() throws Exception {
+        checkGroupLockGetOneKey(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockGetOneKeyPessimistic() throws Exception {
+        checkGroupLockGetOneKey(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        // Populate cache.
+        cache.putAll(F.asMap(
+            key1, "val1",
+            key2, "val2")
+        );
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            assertEquals("val1", cache.get(key1));
+
+            assertEquals("val2", cache.get(key2));
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockWithExternalLockOptimistic() throws Exception {
+        checkGroupLockWithExternalLock(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockWithExternalLockPessimistic() throws Exception {
+        checkGroupLockWithExternalLock(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockWithExternalLock(final IgniteTxConcurrency concurrency) throws Exception {
+        assert sanityCheckEnabled();
+
+        final UUID affinityKey = primaryKeyForCache(grid(0));
+
+        final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+
+        final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid(0).jcache(null);
+
+        // Populate cache.
+        cache.put(key1, "val1");
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+        }
+
+        final CountDownLatch unlockLatch = new CountDownLatch(1);
+        final CountDownLatch lockLatch = new CountDownLatch(1);
+
+        IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    cache.lock(key1).lock();
+
+                    try {
+                        lockLatch.countDown();
+                        unlockLatch.await();
+                    }
+                    finally {
+                        cache.lock(key1).unlock();
+                    }
+                }
+                catch (CacheException e) {
+                    fail(e.getMessage());
+                }
+                catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }, 1);
+
+        try {
+            lockLatch.await();
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency,
+                        READ_COMMITTED, 0, 1)) {
+                        cache.put(key1, "val01");
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, IgniteTxHeuristicException.class, null);
+        }
+        finally {
+            unlockLatch.countDown();
+
+            fut.get();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSanityCheckDisabledOptimistic() throws Exception {
+        checkSanityCheckDisabled(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSanityCheckDisabledPessimistic() throws Exception {
+        checkSanityCheckDisabled(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkSanityCheckDisabled(final IgniteTxConcurrency concurrency) throws Exception {
+        assert !sanityCheckEnabled();
+
+        GridEx grid = grid(0);
+
+        final UUID affinityKey = primaryKeyForCache(grid);
+
+        final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+
+        final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid.jcache(null);
+
+        // Populate cache.
+        cache.put(key1, "val1");
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+        }
+
+        cache.lock(key1).lock();
+
+        try {
+            try (IgniteTx tx = grid.transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) {
+                cache.put(key1, "val01");
+
+                tx.commit();
+            }
+
+            for (int i = 0; i < gridCount(); i++) {
+                Ignite g = grid(i);
+
+                GridCache<Object, Object> gCache = g.cache(null);
+
+                if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                    assertEquals("For index: " + i, "val01", gCache.peek(key1));
+            }
+        }
+        finally {
+            cache.lock(key1).unlock();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupPartitionLockOptimistic() throws Exception {
+        checkGroupPartitionLock(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupPartitionLockPessimistic() throws Exception {
+        checkGroupPartitionLock(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupPartitionLock(IgniteTxConcurrency concurrency) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCache<UUID, String> cache = grid(0).cache(null);
+
+        UUID key1;
+        UUID key2;
+
+        try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affinityKey), concurrency,
+            READ_COMMITTED, 0, 2)) {
+            // Note that events are not generated for internal keys.
+            assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0,
+                unlocks.affectedKeys().size());
+
+            GridCacheAdapter<Object, Object> cacheAdapter = ((GridKernal)grid(0)).internalCache();
+
+            GridCacheAffinityManager<Object, Object> affMgr = cacheAdapter.context().affinity();
+
+            GridPartitionLockKey partAffKey = affMgr.partitionAffinityKey(cache.affinity().partition(affinityKey));
+
+            if (concurrency == PESSIMISTIC)
+                assertTrue(cacheAdapter.entryEx(partAffKey).lockedByThread());
+
+            do {
+                key1 = UUID.randomUUID();
+            }
+            while (cache.affinity().partition(key1) != cache.affinity().partition(affinityKey));
+
+            do {
+                key2 = UUID.randomUUID();
+            }
+            while (cache.affinity().partition(key2) != cache.affinity().partition(affinityKey));
+
+            cache.putAll(F.asMap(
+                key1, "val1",
+                key2, "val2")
+            );
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+        assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutOptimisticReadCommitted() throws Exception {
+        checkGetPut(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutOptimisticRepeatableRead() throws Exception {
+        checkGetPut(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutPessimisticReadCommitted() throws Exception {
+        checkGetPut(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutPessimisticRepeatableRead() throws Exception {
+        checkGetPut(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutEmptyCachePessimisticReadCommitted() throws Exception {
+        checkGetPutEmptyCache(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutEmptyCachePessimisticRepeatableRead() throws Exception {
+        checkGetPutEmptyCache(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutEmptyCacheOptimisticReadCommitted() throws Exception {
+        checkGetPutEmptyCache(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetPutEmptyCacheOptimisticRepeatableRead() throws Exception {
+        checkGetPutEmptyCache(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @param isolation Transaction isolation mode.
+     * @throws Exception If failed.
+     */
+    private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        // Populate cache.
+        cache.putAll(F.asMap(
+            key1, "val1",
+            key2, "val2")
+        );
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            assertEquals("val1", cache.get(key1));
+
+            assertEquals("val2", cache.get(key2));
+
+            cache.put(key1, "val01");
+
+            cache.put(key2, "val02");
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @param isolation Transaction isolation mode.
+     * @throws Exception If failed.
+     */
+    private void checkGetPutEmptyCache(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            assertEquals(null, cache.get(key1));
+
+            assertEquals(null, cache.get(key2));
+
+            cache.put(key1, "val01");
+
+            cache.put(key2, "val02");
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val01", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val02", gCache.peek(key2));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemoveOptimisticReadCommitted() throws Exception {
+        checkGetRemove(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemoveOptimisticRepeatableRead() throws Exception {
+        checkGetRemove(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemovePessimisticReadCommitted() throws Exception {
+        checkGetRemove(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemovePessimisticRepeatableRead() throws Exception {
+        checkGetRemove(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @param isolation Transaction isolation mode.
+     * @throws Exception If failed.
+     */
+    private void checkGetRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        // Populate cache.
+        cache.putAll(F.asMap(
+            key1, "val1",
+            key2, "val2")
+        );
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            assertEquals("val1", cache.get(key1));
+
+            assertEquals("val2", cache.get(key2));
+
+            cache.remove(key1);
+
+            cache.remove(key2);
+
+            tx.commit();
+        }
+
+        for (int i = 0; i < gridCount(); i++) {
+            assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key1) + ']', cache(i).peek(key1));
+            assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key2) + ']', cache(i).peek(key2));
+
+            assertTrue("For cache [idx=" + i + ", keySet=" + cache(i).keySet() + ']', cache(i).size() <= 1);
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetAfterPutOptimistic() throws Exception {
+        checkGetAfterPut(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetAfterPut() throws Exception {
+        checkGetAfterPut(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGetAfterPut(IgniteTxConcurrency concurrency) throws Exception {
+        CollectingEventListener locks = new CollectingEventListener();
+        CollectingEventListener unlocks = new CollectingEventListener();
+
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        // Populate cache.
+        cache.putAll(F.asMap(
+            key1, "val1",
+            key2, "val2")
+        );
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+        }
+
+        grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED);
+        grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED);
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) {
+            if (concurrency == PESSIMISTIC)
+                assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+            else
+                assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size());
+
+            assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size());
+
+            assertEquals("val1", cache.get(key1));
+
+            assertEquals("val2", cache.get(key2));
+
+            cache.put(key1, "val01");
+
+            cache.put(key2, "val02");
+
+            assertEquals("val01", cache.get(key1));
+
+            assertEquals("val02", cache.get(key2));
+
+            tx.commit();
+        }
+
+        // Check that there are no further locks after transaction commit.
+        assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size());
+        assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
+
+        assertEquals("val01", cache.get(key1));
+
+        assertEquals("val02", cache.get(key2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRepeatableReadOptimistic() throws Exception {
+        checkGetRepeatableRead(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRepeatableReadPessimistic() throws Exception {
+        checkGetRepeatableRead(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGetRepeatableRead(IgniteTxConcurrency concurrency) throws Exception {
+        UUID key = primaryKeyForCache(grid(0));
+
+        cache(0).put(key, "val");
+
+        try (IgniteTx ignored = cache(0).txStartPartition(cache(0).affinity().partition(key), concurrency,
+            REPEATABLE_READ, 0, 1)) {
+            assertEquals("val", cache(0).get(key));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockPutWrongKeyOptimistic() throws Exception {
+        checkGroupLockPutWrongKey(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockPutWrongKeyPessimistic() throws Exception {
+        checkGroupLockPutWrongKey(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockPutWrongKey(IgniteTxConcurrency concurrency) throws Exception {
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) {
+            // Key with affinity key different from enlisted on tx start should raise exception.
+            cache.put(new GridCacheAffinityKey<>("key1", UUID.randomUUID()), "val1");
+
+            fail("Exception should be thrown");
+        }
+        catch (IgniteCheckedException ignored) {
+            // Expected exception.
+        }
+
+        assertNull(cache.tx());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockRemoveWrongKeyOptimistic() throws Exception {
+        checkGroupLockRemoveWrongKey(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockRemoveWrongKeyPessimistic() throws Exception {
+        checkGroupLockRemoveWrongKey(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockRemoveWrongKey(IgniteTxConcurrency concurrency) throws Exception {
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        final GridCacheAffinityKey<String> key = new GridCacheAffinityKey<>("key1", UUID.randomUUID());
+
+        cache.put(key, "val");
+
+        try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) {
+            // Key with affinity key different from enlisted on tx start should raise exception.
+            cache.remove(key);
+
+            fail("Exception should be thrown.");
+        }
+        catch (IgniteCheckedException ignored) {
+            // Expected exception.
+        }
+
+        assertNull(cache.tx());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockReadAffinityKeyPessimitsticRepeatableRead() throws Exception {
+        checkGroupLockReadAffinityKey(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockReadAffinityKeyPessimitsticReadCommitted() throws Exception {
+        checkGroupLockReadAffinityKey(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockReadAffinityKeyOptimisticRepeatableRead() throws Exception {
+        checkGroupLockReadAffinityKey(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockReadAffinityKeyOptimisticReadCommitted() throws Exception {
+        checkGroupLockReadAffinityKey(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockReadAffinityKey(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation)
+        throws Exception {
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        final GridCache<Object, String> cache = grid(0).cache(null);
+
+        final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        final GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+
+        cache.put(affinityKey, "0");
+        cache.put(key1, "0");
+        cache.put(key2, "0");
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 3)) {
+            assertEquals("0", cache.get(affinityKey));
+            assertEquals("0", cache.get(key1));
+            assertEquals("0", cache.get(key2));
+
+            cache.put(affinityKey, "1");
+            cache.put(key1, "1");
+            cache.put(key2, "1");
+
+            assertEquals("1", cache.get(affinityKey));
+            assertEquals("1", cache.get(key1));
+            assertEquals("1", cache.get(key2));
+
+            tx.commit();
+        }
+
+        assertEquals("1", cache.get(affinityKey));
+        assertEquals("1", cache.get(key1));
+        assertEquals("1", cache.get(key2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockWriteThroughBatchUpdateOptimistic() throws Exception {
+        checkGroupLockWriteThrough(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupLockWriteThroughBatchUpdatePessimistic() throws Exception {
+        checkGroupLockWriteThrough(PESSIMISTIC);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency mode.
+     * @throws Exception If failed.
+     */
+    private void checkGroupLockWriteThrough(IgniteTxConcurrency concurrency) throws Exception {
+        UUID affinityKey = primaryKeyForCache(grid(0));
+
+        GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null);
+
+        GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey);
+        GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey);
+        GridCacheAffinityKey<String> key3 = new GridCacheAffinityKey<>("key3", affinityKey);
+        GridCacheAffinityKey<String> key4 = new GridCacheAffinityKey<>("key4", affinityKey);
+
+        Map<GridCacheAffinityKey<String>, String> putMap = F.asMap(
+            key1, "val1",
+            key2, "val2",
+            key3, "val3",
+            key4, "val4");
+
+        try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 4)) {
+            cache.put(key1, "val1");
+            cache.put(key2, "val2");
+            cache.put(key3, "val3");
+            cache.put(key4, "val4");
+
+            tx.commit();
+        }
+
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            GridCache<Object, Object> gCache = g.cache(null);
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1))
+                assertEquals("For index: " + i, "val1", gCache.peek(key1));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2))
+                assertEquals("For index: " + i, "val2", gCache.peek(key2));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key3))
+                assertEquals("For index: " + i, "val3", gCache.peek(key3));
+
+            if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key4))
+                assertEquals("For index: " + i, "val4", gCache.peek(key4));
+        }
+
+        // Check the store.
+        assertTrue(store.storeMap().equals(putMap));
+        assertEquals(1, store.putCount());
+    }
+
+    /** @return {@code True} if sanity check should be enabled. */
+    private boolean sanityCheckEnabled() {
+        return !getName().contains("SanityCheckDisabled");
+    }
+
+    /**
+     * @param primary Primary node for which key should be calculated.
+     * @return Key for which given node is primary.
+     * @throws IgniteCheckedException If affinity can not be calculated.
+     */
+    protected UUID primaryKeyForCache(Ignite primary) throws IgniteCheckedException {
+        UUID res;
+
+        int cnt = 0;
+
+        UUID primaryId = primary.cluster().localNode().id();
+
+        do {
+            res = UUID.randomUUID();
+
+            cnt++;
+
+            if (cnt > 10000)
+                throw new IllegalStateException("Cannot find key for primary node: " + primaryId);
+        }
+        while (!primary.cluster().mapKeyToNode(null, res).id().equals(primaryId));
+
+        return res;
+    }
+
+    /**
+     * @param primary Primary node for which keys should be calculated.
+     * @param cnt Key count.
+     * @return Collection of generated keys.
+     * @throws IgniteCheckedException If affinity can not be calculated.
+     */
+    protected UUID[] primaryKeysForCache(Ignite primary, int cnt) throws IgniteCheckedException {
+        Collection<UUID> keys = new LinkedHashSet<>();
+
+        int iters = 0;
+
+        do {
+            keys.add(primaryKeyForCache(primary));
+
+            iters++;
+
+            if (iters > 10000)
+                throw new IllegalStateException("Cannot find keys for primary node [nodeId=" +
+                    primary.cluster().localNode().id() + ", cnt=" + cnt + ']');
+        }
+        while (keys.size() < cnt);
+
+        UUID[] res = new UUID[keys.size()];
+
+        return keys.toArray(res);
+    }
+
+    /** Event listener that collects all incoming events. */
+    protected static class CollectingEventListener implements IgnitePredicate<IgniteEvent> {
+        /** Collected events. */
+        private final Collection<Object> affectedKeys = new GridConcurrentLinkedHashSet<>();
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(IgniteEvent evt) {
+            assert evt.type() == EVT_CACHE_OBJECT_LOCKED || evt.type() == EVT_CACHE_OBJECT_UNLOCKED;
+
+            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
+
+            synchronized (this) {
+                affectedKeys.add(cacheEvt.key());
+
+                notifyAll();
+            }
+
+            return true;
+        }
+
+        /** @return Collection of affected keys. */
+        public Collection<Object> affectedKeys() {
+            return affectedKeys;
+        }
+
+        /**
+         * Waits until events received for all supplied keys.
+         *
+         * @param timeout Timeout to wait.
+         * @param keys Keys to wait for.
+         * @return {@code True} if wait was successful, {@code false} if wait timed out.
+         * @throws InterruptedException If thread was interrupted.
+         */
+        public boolean awaitKeys(long timeout, Object... keys) throws InterruptedException {
+            long start = System.currentTimeMillis();
+
+            Collection<Object> keysCol = Arrays.asList(keys);
+
+            synchronized (this) {
+                while (true) {
+                    long now = System.currentTimeMillis();
+
+                    if (affectedKeys.containsAll(keysCol))
+                        return true;
+                    else if (start + timeout > now)
+                        wait(start + timeout - now);
+                    else
+                        return false;
+                }
+            }
+        }
+    }
+
+    /** Test store that accumulates values into map. */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** */
+        private ConcurrentMap<Object, Object> storeMap = new ConcurrentHashMap8<>();
+
+        /** */
+        private AtomicInteger putCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
+            for (Cache.Entry<?, ?> e : entries)
+                storeMap.put(e.getKey(), e.getValue());
+
+            putCnt.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> e) {
+            storeMap.put(e.getKey(), e.getValue());
+
+            putCnt.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+
+        /** @return Stored values map. */
+        public ConcurrentMap<Object, Object> storeMap() {
+            return storeMap;
+        }
+
+        /** @return Number of calls to put(). */
+        public int putCount() {
+            return putCnt.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java
new file mode 100644
index 0000000..16d58c0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Tests optimistic group lock transactions.
+ */
+public class GridCacheGroupLockFailoverOptimisticTxSelfTest extends GridCacheGroupLockFailoverSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean optimisticTx() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java
new file mode 100644
index 0000000..8758b21
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import com.google.common.collect.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.failover.*;
+import org.apache.ignite.spi.failover.always.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Tests group lock transaction failover.
+ */
+public class GridCacheGroupLockFailoverSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Size of the test map. */
+    private static final int TEST_MAP_SIZE = 200000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** Size of data chunk, sent to a remote node. */
+    private static final int DATA_CHUNK_SIZE = 1000;
+
+    /** Number of chunk on which to fail worker node. */
+    public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / DATA_CHUNK_SIZE) / 3;
+
+    /** */
+    private static final int FAILOVER_PUSH_GAP = 30;
+
+    /** Master node name. */
+    private static final String MASTER = "master";
+
+    /** Near enabled flag. */
+    private boolean nearEnabled;
+
+    /** Backups count. */
+    private int backups;
+
+    /** Filter to include only worker nodes. */
+    private static final IgnitePredicate<ClusterNode> workerNodesFilter = new PN() {
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(ClusterNode n) {
+            return "worker".equals(n.attribute("segment"));
+        }
+    };
+
+    /**
+     * Result future queue (restrict the queue size
+     * to 50 in order to prevent in-memory data grid from over loading).
+     */
+    private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new LinkedBlockingQueue<>(10);
+
+    /**
+     * @return {@code True} if test should use optimistic transactions.
+     */
+    protected boolean optimisticTx() {
+        return false;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearEnabledOneBackup() throws Exception {
+        checkPutAllFailoverGroupLock(true, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearDisabledOneBackup() throws Exception {
+        checkPutAllFailoverGroupLock(false, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearEnabledTwoBackups() throws Exception {
+        checkPutAllFailoverGroupLock(true, 5, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearDisabledTwoBackups() throws Exception {
+        checkPutAllFailoverGroupLock(false, 5, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearEnabledThreeBackups() throws Exception {
+        checkPutAllFailoverGroupLock(true, 7, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverGroupLockNearDisabledThreeBackups() throws Exception {
+        checkPutAllFailoverGroupLock(false, 7, 3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return super.getTestTimeout() * 5;
+    }
+
+    /**
+     * Tests putAll() method along with failover and cache backup.
+     *
+     * Checks that the resulting primary cache size is the same as
+     * expected.
+     *
+     * @param near {@code True} for near enabled.
+     * @param workerCnt Workers count.
+     * @param shutdownCnt Shutdown count.
+     * @throws Exception If failed.
+     */
+    public void checkPutAllFailoverGroupLock(boolean near, int workerCnt, int shutdownCnt) throws Exception {
+        nearEnabled = near;
+        backups = shutdownCnt;
+
+        Collection<Integer> testKeys = generateTestKeys();
+
+        Ignite master = startGrid(MASTER);
+
+        List<Ignite> workers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++)
+            workers.add(startGrid("worker" + i));
+
+        info("Master: " + master.cluster().localNode().id());
+
+        List<Ignite> runningWorkers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++) {
+            UUID id = workers.get(i - 1).cluster().localNode().id();
+
+            info(String.format("Worker%d: %s", i, id));
+
+            runningWorkers.add(workers.get(i - 1));
+        }
+
+        try {
+            // Dummy call to fetch affinity function from remote node
+            master.cluster().mapKeyToNode(CACHE_NAME, "Dummy");
+
+            Map<UUID, Collection<Integer>> dataChunks = new HashMap<>();
+
+            int chunkCntr = 0;
+
+            int failoverPushGap = 0;
+
+            for (Integer key : testKeys) {
+                ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key);
+
+                UUID nodeId = mappedNode.id();
+
+                Collection<Integer> data = dataChunks.get(nodeId);
+
+                if (data == null) {
+                    data = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                    dataChunks.put(nodeId, data);
+                }
+
+                data.add(key);
+
+                if (data.size() == DATA_CHUNK_SIZE) { // time to send data
+                    chunkCntr++;
+
+                    info("Pushing data chunk: " + chunkCntr);
+
+                    submitDataChunk(master, nodeId, data);
+
+                    data = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                    dataChunks.put(nodeId, data);
+
+                    if (chunkCntr >= FAIL_ON_CHUNK_NO) {
+                        if (workerCnt - runningWorkers.size() < shutdownCnt) {
+                            if (failoverPushGap > 0)
+                                failoverPushGap--;
+                            else {
+                                Ignite victim = runningWorkers.remove(0);
+
+                                info("Shutting down node: " + victim.cluster().localNode().id());
+
+                                stopGrid(victim.name());
+
+                                // Fail next node after some jobs have been pushed.
+                                failoverPushGap = FAILOVER_PUSH_GAP;
+                            }
+                        }
+                    }
+                }
+            }
+
+            // Submit the rest of data.
+            for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet())
+                submitDataChunk(master, entry.getKey(), entry.getValue());
+
+            // Wait for queue to empty.
+            info("Waiting for empty queue...");
+
+            long seenSize = resQueue.size();
+
+            while (true) {
+                U.sleep(10000);
+
+                if (!resQueue.isEmpty()) {
+                    long size = resQueue.size();
+
+                    if (seenSize == size) {
+                        info(">>> Failed to wait for queue to empty.");
+
+                        break;
+                    }
+
+                    seenSize = size;
+                }
+                else
+                    break;
+            }
+
+            Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys);
+
+            info(">>> Absent keys: " + absentKeys);
+
+            assertTrue(absentKeys.isEmpty());
+
+            // Actual primary cache size.
+            int primaryCacheSize = 0;
+
+            for (Ignite g : runningWorkers) {
+                info(">>>>> " + g.cache(CACHE_NAME).size());
+
+                primaryCacheSize += g.cache(CACHE_NAME).primarySize();
+            }
+
+            assertTrue(TEST_MAP_SIZE <= primaryCacheSize);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Does remapping.
+     * @param master Master grid.
+     * @param keys Keys.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void remap(final Ignite master, Iterable<Integer> keys) throws IgniteCheckedException {
+        Map<UUID, Collection<Integer>> dataChunks = new HashMap<>();
+
+        for (Integer key : keys) {
+            ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key);
+
+            UUID nodeId = mappedNode.id();
+
+            Collection<Integer> data = dataChunks.get(nodeId);
+
+            if (data == null) {
+                data = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                dataChunks.put(nodeId, data);
+            }
+
+            data.add(key);
+        }
+
+        for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet())
+            submitDataChunk(master, entry.getKey(), entry.getValue());
+    }
+
+    /**
+     * Submits next data chunk as grid task. Blocks if queue is full.
+     *
+     * @param master Master node to submit from.
+     * @param preferredNodeId Node id to execute job on.
+     * @param dataChunk Data chunk to put in cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void submitDataChunk(final Ignite master, UUID preferredNodeId, final Collection<Integer> dataChunk)
+        throws IgniteCheckedException {
+        ClusterGroup prj = master.cluster().forPredicate(workerNodesFilter);
+
+        IgniteCompute comp = master.compute(prj).enableAsync();
+
+        comp.execute(new GridCacheGroupLockPutTask(preferredNodeId, CACHE_NAME, optimisticTx()), dataChunk);
+
+        ComputeTaskFuture<Void> fut = comp.future();
+
+        fut.listenAsync(new CI1<IgniteFuture<Void>>() {
+            @Override public void apply(IgniteFuture<Void> f) {
+                ComputeTaskFuture taskFut = (ComputeTaskFuture)f;
+
+                boolean fail = false;
+
+                try {
+                    f.get(); //if something went wrong - we'll get exception here
+                }
+                catch (IgniteCheckedException ignore) {
+                    info("Put task failed, going to remap keys: " + dataChunk.size());
+
+                    fail = true;
+                }
+                finally {
+                    // Remove complete future from queue to allow other jobs to proceed.
+                    resQueue.remove(taskFut);
+
+                    try {
+                        if (fail)
+                            remap(master, dataChunk);
+                    }
+                    catch (IgniteCheckedException e) {
+                        info("Failed to remap task [data=" + dataChunk.size() + ", e=" + e + ']');
+                    }
+                }
+            }
+        });
+
+        try {
+            resQueue.put(fut);
+
+            if (fut.isDone())
+                resQueue.remove(fut);
+        }
+        catch (InterruptedException ignored) {
+            info(">>>> Failed to wait for future submission: " + fut);
+
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Tries to find keys, that are absent in cache.
+     *
+     * @param workerNode Worker node.
+     * @param keys Keys that are suspected to be absent
+     * @return List of absent keys. If no keys are absent, the list is empty.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private Collection<Integer> findAbsentKeys(Ignite workerNode,
+        Collection<Integer> keys) throws IgniteCheckedException {
+
+        Collection<Integer> ret = new ArrayList<>(keys.size());
+
+        GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME);
+
+        for (Integer key : keys) {
+            if (cache.get(key) == null) // Key is absent.
+                ret.add(key);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Generates a test keys collection.
+     *
+     * @return A test keys collection.
+     */
+    private Collection<Integer> generateTestKeys() {
+        Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE);
+
+        for (int i = 0; i < TEST_MAP_SIZE; i++)
+            ret.add(i);
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS);
+
+        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+
+        discoverySpi.setAckTimeout(60000);
+        discoverySpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoverySpi);
+
+        if (gridName.startsWith("master")) {
+            cfg.setUserAttributes(ImmutableMap.of("segment", "master"));
+
+            GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(true, (IgnitePredicate)workerNodesFilter);
+
+            // For sure.
+            failoverSpi.setMaximumFailoverAttempts(50);
+
+            cfg.setFailoverSpi(failoverSpi);
+        }
+        else if (gridName.startsWith("worker")) {
+            GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(false);
+
+            cfg.setFailoverSpi(failoverSpi);
+
+            cfg.setUserAttributes(ImmutableMap.of("segment", "worker"));
+
+            CacheConfiguration cacheCfg = defaultCacheConfiguration();
+            cacheCfg.setName("partitioned");
+            cacheCfg.setCacheMode(GridCacheMode.PARTITIONED);
+            cacheCfg.setStartSize(4500000);
+            cacheCfg.setBackups(backups);
+            cacheCfg.setStoreValueBytes(true);
+            cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+            cacheCfg.setQueryIndexEnabled(false);
+            cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+            cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+            cfg.setCacheConfiguration(cacheCfg);
+        }
+        else
+            throw new IllegalStateException("Unexpected grid name: " + gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Test failover SPI that remembers the job contexts of failed jobs.
+     */
+    private class GridTestFailoverSpi extends AlwaysFailoverSpi {
+        /** */
+        private static final String FAILOVER_NUMBER_ATTR = "failover:number:attr";
+
+        /** */
+        private final boolean master;
+
+        /** */
+        private Set<ComputeJobContext> failedOverJobs = new HashSet<>();
+
+        /** Node filter. */
+        private IgnitePredicate<? super ClusterNode>[] filter;
+
+        /**
+         * @param master Master flag.
+         * @param filter Filters.
+         */
+        @SafeVarargs
+        GridTestFailoverSpi(boolean master, IgnitePredicate<? super ClusterNode>... filter) {
+            this.master = master;
+            this.filter = filter;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) {
+            List<ClusterNode> cp = null;
+            if (master) {
+                failedOverJobs.add(ctx.getJobResult().getJobContext());
+
+                // Clear failed nodes list - allow to failover on the same node.
+                ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null);
+
+                // Account for maximum number of failover attempts since we clear failed node list.
+                Integer failoverCnt = ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR);
+
+                if (failoverCnt == null)
+                    ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1);
+                else {
+                    if (failoverCnt >= getMaximumFailoverAttempts()) {
+                        info("Job failover failed because number of maximum failover attempts is exceeded " +
+                            "[failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" +
+                            getMaximumFailoverAttempts() + ']');
+
+                        return null;
+                    }
+
+                    ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, failoverCnt + 1);
+                }
+
+                cp = new ArrayList<>(top);
+
+                // Keep collection type.
+                F.retain(cp, false, new IgnitePredicate<ClusterNode>() {
+                    @Override public boolean apply(ClusterNode node) {
+                        return F.isAll(node, filter);
+                    }
+                });
+            }
+
+            return super.failover(ctx, cp); //use cp to ensure we don't failover on failed node
+        }
+
+        /**
+         * @return Job contexts for failed over jobs.
+         */
+        public Set<ComputeJobContext> getFailedOverJobs() {
+            return failedOverJobs;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java
new file mode 100644
index 0000000..f581a0a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Abstract test for multi-node group lock tests.
+ */
+public abstract class GridCacheGroupLockMultiNodeAbstractSelfTest extends GridCacheGroupLockAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java
new file mode 100644
index 0000000..4fa788b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Puts all the passed data into partitioned cache in small chunks.
+ */
+class GridCacheGroupLockPutTask extends ComputeTaskAdapter<Collection<Integer>, Void> {
+    /** Preferred node. */
+    private final UUID preferredNode;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Optimistic transaction flag. */
+    private final boolean optimistic;
+
+    /**
+     *
+     * @param preferredNode A node that we'd prefer to take from grid.
+     * @param cacheName A name of the cache to work with.
+     * @param optimistic Optimistic transaction flag.
+     */
+    GridCacheGroupLockPutTask(UUID preferredNode, String cacheName, boolean optimistic) {
+        this.preferredNode = preferredNode;
+        this.cacheName = cacheName;
+        this.optimistic = optimistic;
+    }
+
+    /**
+     * This method is called to map or split grid task into multiple grid jobs. This is the first method that gets called
+     * when task execution starts.
+     *
+     * @param data     Task execution argument. Can be {@code null}. This is the same argument as the one passed into {@code
+     *                Grid#execute(...)} methods.
+     * @param subgrid Nodes available for this task execution. Note that order of nodes is guaranteed to be randomized by
+     *                container. This ensures that every time you simply iterate through grid nodes, the order of nodes
+     *                will be random which over time should result into all nodes being used equally.
+     * @return Map of grid jobs assigned to subgrid node. Unless {@link org.apache.ignite.compute.ComputeTaskContinuousMapper} is injected into task, if
+     *         {@code null} or empty map is returned, exception will be thrown.
+     * @throws IgniteCheckedException If mapping could not complete successfully. This exception will be thrown out of {@link
+     *                       org.apache.ignite.compute.ComputeTaskFuture#get()} method.
+     */
+    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable final Collection<Integer> data) throws IgniteCheckedException {
+        assert !subgrid.isEmpty();
+
+        // Give preference to wanted node. Otherwise, take the first one.
+        ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) {
+                return preferredNode.equals(e.id());
+            }
+        });
+
+        return Collections.singletonMap(
+            new ComputeJobAdapter() {
+                @IgniteLoggerResource
+                private IgniteLogger log;
+
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public Object execute() throws IgniteCheckedException {
+                    log.info("Going to put data: " + data.size());
+
+                    GridCache<Object, Object> cache = ignite.cache(cacheName);
+
+                    assert cache != null;
+
+                    Map<Integer, T2<Integer, Collection<Integer>>> putMap = groupData(data);
+
+                    for (Map.Entry<Integer, T2<Integer, Collection<Integer>>> entry : putMap.entrySet()) {
+                        T2<Integer, Collection<Integer>> pair = entry.getValue();
+
+                        Object affKey = pair.get1();
+
+                        // Group lock partition.
+                        try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affKey),
+                            optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ, 0, pair.get2().size())) {
+                            for (Integer val : pair.get2())
+                                cache.put(val, val);
+
+                            tx.commit();
+                        }
+                    }
+
+                    log.info("Finished put data: " + data.size());
+
+                    return data;
+                }
+
+                /**
+                 * Groups values by partitions.
+                 *
+                 * @param data Data to put.
+                 * @return Grouped map.
+                 */
+                private Map<Integer, T2<Integer, Collection<Integer>>> groupData(Iterable<Integer> data) {
+                    GridCache<Object, Object> cache = ignite.cache(cacheName);
+
+                    Map<Integer, T2<Integer, Collection<Integer>>> res = new HashMap<>();
+
+                    for (Integer val : data) {
+                        int part = cache.affinity().partition(val);
+
+                        T2<Integer, Collection<Integer>> tup = res.get(part);
+
+                        if (tup == null) {
+                            tup = new T2<Integer, Collection<Integer>>(val, new LinkedList<Integer>());
+
+                            res.put(part, tup);
+                        }
+
+                        tup.get2().add(val);
+                    }
+
+                    return res;
+                }
+            },
+            targetNode);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
new file mode 100644
index 0000000..bb7d67f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of nodes to test on. */
+    private static final int GRID_CNT = 4;
+
+    /** Number of increment iterations. */
+    private static final int NUM_ITERS = 5000;
+
+    /** Helper for excluding stopped node from iteration logic. */
+    private AtomicReferenceArray<Ignite> grids;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cache = new CacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setAtomicityMode(ATOMIC);
+        cache.setDistributionMode(PARTITIONED_ONLY);
+        cache.setAtomicWriteOrderMode(PRIMARY);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setBackups(1);
+        cache.setPreloadMode(SYNC);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(GRID_CNT);
+
+        grids = new AtomicReferenceArray<>(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grids.set(i, grid(i));
+
+        cache(0).put("key", new TestObject(0));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        grids = null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrement() throws Exception {
+        testIncrement(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementRestart() throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+
+        IgniteFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    Random rnd = new Random();
+
+                    while (!stop.get()) {
+                        int idx = -1;
+
+                        Ignite ignite = null;
+
+                        while (ignite == null) {
+                            idx = rnd.nextInt(GRID_CNT);
+
+                            ignite = grids.getAndSet(idx, null);
+                        }
+
+                        stopGrid(idx);
+
+                        assert grids.compareAndSet(idx, null, startGrid(idx));
+                    }
+                }
+                catch (Exception e) {
+                    error.set(e);
+                }
+            }
+        }, 1, "restarter");
+
+        try {
+            testIncrement(true);
+
+            assertNull(error.get());
+        }
+        finally {
+            stop.set(true);
+
+            fut.get(getTestTimeout());
+        }
+    }
+
+    /**
+     * @param restarts Whether test is running with node restarts.
+     * @throws Exception If failed.
+     */
+    private void testIncrement(boolean restarts) throws Exception {
+        Random rnd = new Random();
+
+        for (int i = 0; i < NUM_ITERS; i++) {
+            int idx = -1;
+
+            Ignite ignite = null;
+
+            while (ignite == null) {
+                idx = rnd.nextInt(GRID_CNT);
+
+                ignite = restarts ? grids.getAndSet(idx, null) : grid(idx);
+            }
+
+            IgniteCache<String, TestObject> cache = ignite.jcache(null);
+
+            assertNotNull(cache);
+
+            TestObject obj = cache.get("key");
+
+            assertNotNull(obj);
+            assertEquals(i, obj.val);
+
+            while (true) {
+                try {
+                    cache.invoke("key", new Processor());
+
+                    break;
+                }
+                catch (CachePartialUpdateException ignored) {
+                    // Need to re-check if update actually succeeded.
+                    TestObject updated = cache.get("key");
+
+                    if (updated != null && updated.val == i + 1)
+                        break;
+                }
+            }
+
+            if (restarts)
+                assert grids.compareAndSet(idx, null, ignite);
+        }
+    }
+
+    /** */
+    private static class TestObject implements Serializable {
+        /** Value. */
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "TestObject [val=" + val + ']';
+        }
+    }
+
+    /** */
+    private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable {
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, TestObject> e, Object... args) {
+            TestObject obj = e.getValue();
+
+            assert obj != null;
+
+            e.setValue(new TestObject(obj.val + 1));
+
+            return null;
+        }
+    }
+}


Mime
View raw message