ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [40/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java
new file mode 100644
index 0000000..741636d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Multi-node cache test.
+ */
+public abstract class GridCacheMultiNodeAbstractTest extends GridCommonAbstractTest {
+    /** Grid 1. */
+    private static Ignite ignite1;
+
+    /** Grid 2. */
+    private static Ignite ignite2;
+
+    /** Grid 3. */
+    private static Ignite ignite3;
+
+    /** Cache 1. */
+    private static GridCache<Integer, String> cache1;
+
+    /** Cache 2. */
+    private static GridCache<Integer, String> cache2;
+
+    /** Cache 3. */
+    private static GridCache<Integer, String> cache3;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Listeners. */
+    private static Collection<CacheEventListener> lsnrs = new ArrayList<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite1 = startGrid(1);
+        ignite2 = startGrid(2);
+        ignite3 = startGrid(3);
+
+        cache1 = ignite1.cache(null);
+        cache2 = ignite2.cache(null);
+        cache3 = ignite3.cache(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        cache1 = null;
+        cache2 = null;
+        cache3 = null;
+
+        ignite1 = null;
+        ignite2 = null;
+        ignite3 = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        removeListeners(ignite1);
+        removeListeners(ignite2);
+        removeListeners(ignite3);
+
+        lsnrs.clear();
+    }
+
+    /**
+     * @param ignite Grid to remove listeners from.
+     */
+    private void removeListeners(Ignite ignite) {
+        if (ignite != null)
+            for (CacheEventListener lsnr : lsnrs) {
+                assert lsnr.latch.getCount() == 0;
+
+                ignite.events().stopLocalListen(lsnr);
+            }
+    }
+
+    /**
+     *
+     * @param ignite Grid.
+     * @param lsnr Listener.
+     * @param type Event types.
+     */
+    private void addListener(Ignite ignite, CacheEventListener lsnr, int... type) {
+        if (!lsnrs.contains(lsnr))
+            lsnrs.add(lsnr);
+
+        ignite.events().localListen(lsnr, type.length == 0 ? EVTS_CACHE : type);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testBasicPut() throws Exception {
+        checkPuts(3, ignite1);
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testMultiNodePut() throws Exception {
+        checkPuts(1, ignite1, ignite2, ignite3);
+        checkPuts(1, ignite2, ignite1, ignite3);
+        checkPuts(1, ignite3, ignite1, ignite2);
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testMultiValuePut() throws Exception {
+        checkPuts(1, ignite1);
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testMultiValueMultiNodePut() throws Exception {
+        checkPuts(3, ignite1, ignite2, ignite3);
+        checkPuts(3, ignite2, ignite1, ignite3);
+        checkPuts(3, ignite3, ignite1, ignite2);
+    }
+
+    /**
+     * Checks cache puts.
+     *
+     * @param cnt Count of puts.
+     * @param ignites Grids.
+     * @throws Exception If check fails.
+     */
+    private void checkPuts(int cnt, Ignite... ignites) throws Exception {
+        CountDownLatch latch = new CountDownLatch(ignites.length * cnt);
+
+        CacheEventListener lsnr = new CacheEventListener(latch, EVT_CACHE_OBJECT_PUT);
+
+        for (Ignite ignite : ignites)
+            addListener(ignite, lsnr);
+
+        GridCache<Integer, String> cache1 = ignites[0].cache(null);
+
+        for (int i = 1; i <= cnt; i++)
+            cache1.put(i, "val" + i);
+
+        for (int i = 1; i <= cnt; i++) {
+            String v = cache1.get(i);
+
+            assert v != null;
+            assert v.equals("val" + i);
+        }
+
+        latch.await(10, SECONDS);
+
+        for (Ignite ignite : ignites) {
+            GridCache<Integer, String> cache = ignite.cache(null);
+
+            if (cache == cache1)
+                continue;
+
+            for (int i = 1; i <= cnt; i++) {
+                String v = cache.get(i);
+
+                assert v != null;
+                assert v.equals("val" + i);
+            }
+        }
+
+        assert !cache1.isLocked(1);
+        assert !cache1.isLocked(2);
+        assert !cache1.isLocked(3);
+
+        for (Ignite ignite : ignites)
+            ignite.events().stopLocalListen(lsnr);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testLockUnlock() throws Exception {
+        CacheEventListener lockLsnr1 = new CacheEventListener(ignite1, new CountDownLatch(1), EVT_CACHE_OBJECT_LOCKED);
+
+        addListener(ignite1, lockLsnr1, EVT_CACHE_OBJECT_LOCKED);
+
+        CacheEventListener unlockLsnr = new CacheEventListener(new CountDownLatch(3), EVT_CACHE_OBJECT_UNLOCKED);
+
+        addListener(ignite1, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+        addListener(ignite2, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+        addListener(ignite3, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+
+        IgniteFuture<Boolean> f1 = cache1.lockAsync(1, 0L);
+
+        assert f1.get(10000);
+
+        assert cache1.isLocked(1);
+        assert cache2.isLocked(1);
+        assert cache3.isLocked(1);
+
+        assert cache1.isLockedByThread(1);
+        assert !cache2.isLockedByThread(1);
+        assert !cache3.isLockedByThread(1);
+
+        info("Acquired lock for cache1.");
+
+        cache1.unlockAll(F.asList(1));
+
+        Thread.sleep(50);
+
+        unlockLsnr.latch.await(10, SECONDS);
+
+        assert !cache1.isLocked(1);
+        assert !cache2.isLocked(2);
+        assert !cache3.isLocked(3);
+
+        assert !cache1.isLockedByThread(1);
+        assert !cache2.isLockedByThread(1);
+        assert !cache3.isLockedByThread(1);
+    }
+
+    /**
+     * Concurrent test for asynchronous locks.
+     *
+     * @throws Exception If test fails.
+     */
+    @SuppressWarnings({"BusyWait"})
+    public void testConcurrentLockAsync() throws Exception {
+        CacheEventListener unlockLsnr = new CacheEventListener(new CountDownLatch(9), EVT_CACHE_OBJECT_UNLOCKED);
+
+        addListener(ignite1, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+        addListener(ignite2, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+        addListener(ignite3, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED);
+
+        IgniteFuture<Boolean> f1 = cache1.lockAsync(1, 0L);
+        IgniteFuture<Boolean> f2 = cache2.lockAsync(1, 0L);
+        IgniteFuture<Boolean> f3 = cache3.lockAsync(1, 0L);
+
+        boolean l1 = false;
+        boolean l2 = false;
+        boolean l3 = false;
+
+        int cnt = 0;
+
+        while (!l1 || !l2 || !l3) {
+            if (!l1 && f1.isDone()) {
+                assert cache1.isLocked(1);
+                assert cache2.isLocked(1);
+                assert cache3.isLocked(1);
+
+                assert cache1.isLockedByThread(1);
+                assert !cache2.isLockedByThread(1);
+                assert !cache3.isLockedByThread(1);
+
+                info("Acquired lock for cache1.");
+
+                cache1.unlockAll(F.asList(1));
+
+                l1 = true;
+            }
+
+            if (!l2 && f2.isDone()) {
+                assert cache1.isLocked(1);
+                assert cache2.isLocked(1);
+                assert cache3.isLocked(1);
+
+                assert !cache1.isLockedByThread(1);
+                assert cache2.isLockedByThread(1);
+                assert !cache3.isLockedByThread(1);
+
+                info("Acquired lock for cache2.");
+
+                cache2.unlockAll(F.asList(1));
+
+                l2 = true;
+            }
+
+            if (!l3 && f3.isDone()) {
+                assert cache1.isLocked(1);
+                assert cache2.isLocked(1);
+                assert cache3.isLocked(1);
+
+                assert !cache1.isLockedByThread(1);
+                assert !cache2.isLockedByThread(1);
+                assert cache3.isLockedByThread(1);
+
+                info("Acquired lock for cache3.");
+
+                cache3.unlockAll(F.asList(1));
+
+                l3 = true;
+            }
+
+            info("Acquired locks [cnt=" + ++cnt + ", l1=" + l1 + ", l2=" + l2 + ", l3=" + l3 + ']');
+
+            Thread.sleep(50);
+        }
+
+        unlockLsnr.latch.await(10, SECONDS);
+
+        assert !cache1.isLocked(1);
+        assert !cache2.isLocked(2);
+        assert !cache3.isLocked(3);
+
+        assert !cache1.isLockedByThread(1);
+        assert !cache2.isLockedByThread(1);
+        assert !cache3.isLockedByThread(1);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testConcurrentPutAsync() throws Exception {
+        CountDownLatch latch = new CountDownLatch(9);
+
+        CacheEventListener lsnr = new CacheEventListener(latch, EVT_CACHE_OBJECT_PUT);
+
+        addListener(ignite1, lsnr);
+        addListener(ignite2, lsnr);
+        addListener(ignite3, lsnr);
+
+        IgniteFuture<String> f1 = cache1.putAsync(2, "val1");
+        IgniteFuture<String> f2 = cache2.putAsync(2, "val2");
+        IgniteFuture<String> f3 = cache3.putAsync(2, "val3");
+
+        String v1 = f1.get(20000);
+
+        info("Got v1 from future1: " + v1);
+
+        String v2 = f2.get(20000);
+
+        info("Got v2 from future2: " + v2);
+
+        String v3 = f3.get(20000);
+
+        info("Got v3 from future3: " + v3);
+
+        latch.await(60, SECONDS);
+
+        info("Woke up from latch: " + latch);
+
+        v1 = cache1.get(1);
+        v2 = cache2.get(1);
+        v3 = cache3.get(1);
+
+        info("Cache1 value for key 1: " + v1);
+        info("Cache2 value for key 1: " + v2);
+        info("Cache3 value for key 1: " + v3);
+
+        assert v1 != null;
+        assert v2 != null;
+        assert v3 != null;
+
+        assert v1.equals(v2) : "Mismatch [v1=" + v1 + ", v2=" + v2 + ']';
+        assert v1.equals(v3) : "Mismatch [v1=" + v1 + ", v3=" + v3 + ']';
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGlobalClearAll() throws Exception {
+        cache1.put(1, "val1");
+        cache2.put(2, "val2");
+        cache3.put(3, "val3");
+
+        assert cache1.size() == 3;
+        assert cache2.size() == 3;
+        assert cache3.size() == 3;
+
+        cache1.globalClearAll();
+
+        assert cache1.isEmpty();
+        assert cache2.isEmpty();
+        assert cache3.isEmpty();
+    }
+
+    /**
+     * Event listener.
+     */
+    private class CacheEventListener implements IgnitePredicate<IgniteEvent> {
+        /** */
+        @GridToStringExclude
+        private final Ignite ignite;
+
+        /** Wait latch. */
+        @GridToStringExclude
+        private CountDownLatch latch;
+
+        /** Events to accept. */
+        private final List<Integer> evts;
+
+        /**
+         * @param latch Wait latch.
+         * @param evts Events.
+         */
+        CacheEventListener(CountDownLatch latch, Integer... evts) {
+            this.latch = latch;
+
+            ignite = null;
+
+            assert evts.length > 0;
+
+            this.evts = Arrays.asList(evts);
+        }
+
+        /**
+         * @param ignite Grid.
+         * @param latch Wait latch.
+         * @param evts Events.
+         */
+        CacheEventListener(Ignite ignite, CountDownLatch latch, Integer... evts) {
+            this.ignite = ignite;
+            this.latch = latch;
+
+            assert evts.length > 0;
+
+            this.evts = Arrays.asList(evts);
+        }
+
+        /**
+         * @param latch New latch.
+         */
+        void setLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(IgniteEvent evt) {
+            info("Grid cache event [type=" + evt.type() + ", latch=" + latch.getCount() + ", evt=" + evt + ']');
+
+            if (evts.contains(evt.type()))
+                if (ignite == null || evt.node().id().equals(ignite.cluster().localNode().id())) {
+                    if (latch.getCount() > 0)
+                        latch.countDown();
+                    else
+                        info("Received unexpected cache event: " + evt);
+                }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheEventListener.class, this, "latchCount", latch.getCount(),
+                "grid", ignite != null ? ignite.name() : "N/A", "evts", evts);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
new file mode 100644
index 0000000..9964670
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Test cases for multi-threaded tests.
+ */
+public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstractTest {
+    /** Grid 1. */
+    private static Ignite ignite1;
+
+    /** Grid 2. */
+    private static Ignite ignite2;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Listeners. */
+    private static Collection<IgnitePredicate<IgniteEvent>> lsnrs = new ArrayList<>();
+
+    /**
+     *
+     */
+    protected GridCacheMultiNodeLockAbstractTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @return {@code True} for partitioned caches.
+     */
+    protected boolean partitioned() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite1 = startGrid(1);
+        ignite2 = startGrid(2);
+
+        startGrid(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite1 = null;
+        ignite2 = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        removeListeners(ignite1);
+        removeListeners(ignite2);
+
+        lsnrs.clear();
+
+        for (int i = 1; i <= 3; i++) {
+            cache(i).clearAll();
+
+            assertTrue(
+                "Cache isn't empty [i=" + i + ", entries=" + ((GridKernal)grid(i)).internalCache().entries() + "]",
+                cache(i).isEmpty());
+        }
+    }
+
+    /**
+     * @param ignite Grid to remove listeners from.
+     */
+    private void removeListeners(Ignite ignite) {
+        for (IgnitePredicate<IgniteEvent> lsnr : lsnrs)
+            ignite.events().stopLocalListen(lsnr);
+    }
+
+    /**
+     * @param ignite Grid
+     * @param lsnr Listener.
+     */
+    void addListener(Ignite ignite, IgnitePredicate<IgniteEvent> lsnr) {
+        if (!lsnrs.contains(lsnr))
+            lsnrs.add(lsnr);
+
+        ignite.events().localListen(lsnr, EVTS_CACHE);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    private void checkLocked(GridCacheProjection<Integer,String> cache, Integer key) {
+        assert cache.isLocked(key);
+        assert cache.isLockedByThread(key);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    private void checkLocked(IgniteCache<Integer,String> cache, Integer key) {
+        assert cache.isLocked(key);
+        assert cache.isLockedByThread(key);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    private void checkRemoteLocked(GridCacheProjection<Integer,String> cache, Integer key) {
+        assert cache.isLocked(key);
+        assert !cache.isLockedByThread(key);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    private void checkRemoteLocked(IgniteCache<Integer,String> cache, Integer key) {
+        assert cache.isLocked(key);
+        assert !cache.isLockedByThread(key);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private void checkUnlocked(GridCacheProjection<Integer,String> cache, Integer key) {
+        assert !cache.isLockedByThread(key);
+
+        if (partitioned()) {
+            for(int i = 0; i < 200; i++)
+                if (cache.isLocked(key)) {
+                    try {
+                        Thread.sleep(10);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                else
+                    return;
+        }
+
+        assertFalse("Key locked [key=" + key + ", entries=" + entries(key) + "]", cache.isLocked(key));
+    }
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private void checkUnlocked(IgniteCache<Integer,String> cache, Integer key) {
+        assert !cache.isLockedByThread(key);
+
+        if (partitioned()) {
+            for(int i = 0; i < 200; i++)
+                if (cache.isLocked(key)) {
+                    try {
+                        Thread.sleep(10);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                else
+                    return;
+        }
+
+        assertFalse("Key locked [key=" + key + ", entries=" + entries(key) + "]", cache.isLocked(key));
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkLocked(GridCacheProjection<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys) {
+            checkLocked(cache, key);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkLocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys)
+            checkLocked(cache, key);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkRemoteLocked(GridCacheProjection<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys) {
+            checkRemoteLocked(cache, key);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkRemoteLocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys)
+            checkRemoteLocked(cache, key);
+    }
+
+    /**
+     *
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkUnlocked(GridCacheProjection<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys)
+            checkUnlocked(cache, key);
+    }
+
+    /**
+     *
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void checkUnlocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) {
+        for (Integer key : keys)
+            checkUnlocked(cache, key);
+    }
+
+    /**
+     *
+     * @throws Exception If test failed.
+     */
+    public void testBasicLock() throws Exception {
+        IgniteCache<Integer, String> cache = ignite1.jcache(null);
+
+        cache.lock(1).lock();
+
+        assert cache.isLocked(1);
+        assert cache.isLockedByThread(1);
+
+        cache.lockAll(Collections.singleton(1)).unlock();
+
+        checkUnlocked(cache, 1);
+    }
+
+    /**
+     * Entries for key.
+     *
+     * @param key Key.
+     * @return Entries.
+     */
+    private String entries(int key) {
+        if (partitioned()) {
+            GridNearCacheAdapter<Integer, String> near1 = near(1);
+            GridNearCacheAdapter<Integer, String> near2 = near(2);
+
+            GridDhtCacheAdapter<Integer, String> dht1 = dht(1);
+            GridDhtCacheAdapter<Integer, String> dht2 = dht(2);
+
+            return "Entries [ne1=" + near1.peekEx(key) + ", de1=" + dht1.peekEx(key) + ", ne2=" + near2.peekEx(key) +
+                ", de2=" + dht2.peekEx(key) + ']';
+        }
+
+        return "Entries [e1=" + ignite1.cache(null).entry(key) + ", e2=" + ignite2.cache(null).entry(key) + ']';
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testMultiNodeLock() throws Exception {
+        IgniteCache<Integer, String> cache1 = ignite1.jcache(null);
+        IgniteCache<Integer, String> cache2 = ignite2.jcache(null);
+
+        cache1.lock(1).lock();
+
+        assert cache1.isLocked(1) : entries(1);
+        assert cache1.isLockedByThread(1);
+
+        assert cache2.isLocked(1) : entries(1);
+        assert !cache2.isLockedByThread(1);
+
+        try {
+            assert !cache2.lock(1).tryLock();
+
+            assert cache2.isLocked(1) : entries(1);
+            assert !cache2.isLockedByThread(1);
+        }
+        finally {
+            cache1.lock(1).unlock();
+
+            checkUnlocked(cache1, 1);
+        }
+
+        cache2.lock(1).lock();
+
+        assert cache2.isLocked(1) : entries(1);
+        assert cache2.isLockedByThread(1);
+
+        assert cache1.isLocked(1) : entries(1);
+        assert !cache1.isLockedByThread(1);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        addListener(ignite1, new UnlockListener(latch, 1));
+
+        try {
+            assert !cache1.lock(1).tryLock();
+
+            assert cache1.isLocked(1) : entries(1);
+            assert !cache1.isLockedByThread(1);
+        }
+        finally {
+            cache2.lockAll(Collections.singleton(1)).unlock();
+        }
+
+        latch.await();
+
+        checkUnlocked(cache1, 1);
+        checkUnlocked(cache2, 1);
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testMultiNodeLockWithKeyLists() throws Exception {
+        IgniteCache<Integer, String> cache1 = ignite1.jcache(null);
+        IgniteCache<Integer, String> cache2 = ignite2.jcache(null);
+
+        Set<Integer> keys1 = new HashSet<>();
+        Set<Integer> keys2 = new HashSet<>();
+
+        Collections.addAll(keys1, 1, 2, 3);
+        Collections.addAll(keys2, 2, 3, 4);
+
+        cache1.lockAll(keys1).lock();
+
+        checkLocked(cache1, keys1);
+
+        try {
+            assert !cache2.lockAll(keys2).tryLock();
+
+            assert cache2.isLocked(2);
+            assert cache2.isLocked(3);
+
+            checkUnlocked(cache1, 4);
+            checkUnlocked(cache2, 4);
+
+            assert !cache2.isLockedByThread(2);
+            assert !cache2.isLockedByThread(3);
+            assert !cache2.isLockedByThread(4);
+        }
+        finally {
+            cache1.lockAll(keys1).unlock();
+        }
+
+        checkUnlocked(cache1, keys1);
+
+        checkUnlocked(cache1, keys2);
+        checkUnlocked(cache2, 4);
+
+        cache2.lockAll(keys2).lock();
+
+        CountDownLatch latch1 = new CountDownLatch(keys2.size());
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        addListener(ignite2, new UnlockListener(latch2, 1));
+        addListener(ignite1, (new UnlockListener(latch1, keys2)));
+
+        try {
+            checkLocked(cache2, keys2);
+
+            checkUnlocked(cache2, 1);
+
+            assert cache1.lock(1).tryLock();
+
+            checkLocked(cache1, 1);
+
+            checkRemoteLocked(cache1, keys2);
+
+            checkRemoteLocked(cache2, 1);
+        }
+        finally {
+            cache2.lockAll(keys2).unlock();
+
+            cache1.lockAll(Collections.singleton(1)).unlock();
+        }
+
+        latch1.await();
+        latch2.await();
+
+        checkUnlocked(cache1, keys1);
+        checkUnlocked(cache2, keys1);
+        checkUnlocked(cache1, keys2);
+        checkUnlocked(cache2, keys2);
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testLockReentry() throws IgniteCheckedException {
+        IgniteCache<Integer, String> cache = ignite1.jcache(null);
+
+        cache.lock(1).lock();
+
+        try {
+            checkLocked(cache, 1);
+
+            cache.lock(1).lock();
+
+            checkLocked(cache, 1);
+
+            cache.lockAll(Collections.singleton(1)).unlock();
+
+            checkLocked(cache, 1);
+        }
+        finally {
+            cache.lockAll(Collections.singleton(1)).unlock();
+        }
+
+        checkUnlocked(cache, 1);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testLockMultithreaded() throws Exception {
+        final IgniteCache<Integer, String> cache = ignite1.jcache(null);
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        GridTestThread t1 = new GridTestThread(new Callable<Object>() {
+            /** {@inheritDoc} */
+            @Nullable @Override public Object call() throws Exception {
+                info("Before lock for.key 1");
+
+                cache.lock(1).lock();
+
+                info("After lock for key 1");
+
+                try {
+                    checkLocked(cache, 1);
+
+                    l1.countDown();
+
+                    info("Let thread2 proceed.");
+
+                    // Reentry.
+                    cache.lock(1).lock();
+
+                    checkLocked(cache, 1);
+
+                    // Nested lock.
+                    assert cache.lock(2).tryLock();
+
+                    checkLocked(cache, 2);
+
+                    // Unlock reentry.
+                    cache.lockAll(Collections.singleton(1)).unlock();
+
+                    // Outer should still be owned.
+                    checkLocked(cache, 1);
+
+                    // Unlock in reverse order.
+                    cache.lockAll(Collections.singleton(2)).unlock();
+
+                    checkUnlocked(cache, 2);
+
+                    l2.await();
+
+                    info("Waited for latch 2");
+                }
+                finally {
+                    cache.lockAll(Collections.singleton(1)).unlock();
+
+                    info("Unlocked entry for key 1.");
+                }
+
+                assert !cache.isLockedByThread(1);
+                assert !cache.isLockedByThread(2);
+
+                return null;
+            }
+        });
+
+        GridTestThread t2 = new GridTestThread(new Callable<Object>() {
+            /** {@inheritDoc} */
+            @Nullable @Override public Object call() throws Exception {
+                info("Waiting for latch1...");
+
+                l1.await();
+
+                info("Latch1 released.");
+
+                assert !cache.lock(1).tryLock();
+
+                info("Tried to lock cache for key1");
+
+                l2.countDown();
+
+                info("Released latch2");
+
+                cache.lock(1).lock();
+
+                try {
+                    info("Locked cache for key 1");
+
+                    checkLocked(cache, 1);
+
+                    info("Checked that cache is locked for key 1");
+                }
+                finally {
+                    cache.lockAll(Collections.singleton(1)).unlock();
+
+                    info("Unlocked cache for key 1");
+                }
+
+                checkUnlocked(cache, 1);
+
+                return null;
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        t1.checkError();
+        t2.checkError();
+
+        checkUnlocked(cache, 1);
+        checkUnlocked(cache, 2);
+    }
+
+    /**
+     * Cache unlock listener.
+     */
+    private class UnlockListener implements IgnitePredicate<IgniteEvent> {
+        /** Latch. */
+        private final CountDownLatch latch;
+
+        /** */
+        private final Collection<Integer> keys;
+
+        /**
+         * @param latch Latch.
+         * @param keys Keys.
+         */
+        UnlockListener(CountDownLatch latch, Integer... keys) {
+            this(latch, Arrays.asList(keys));
+        }
+
+        /**
+         * @param latch Latch.
+         * @param keys Keys.
+         */
+        UnlockListener(CountDownLatch latch, Collection<Integer> keys) {
+            assert latch != null;
+            assert keys != null;
+
+            this.latch = latch;
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(IgniteEvent evt) {
+            info("Received cache event: " + evt);
+
+            if (evt instanceof IgniteCacheEvent) {
+                IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
+
+                Integer key = cacheEvt.key();
+
+                if (keys.contains(key))
+                    if (evt.type() == EVT_CACHE_OBJECT_UNLOCKED)
+                        latch.countDown();
+            }
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
new file mode 100644
index 0000000..8553728
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Base test for all multithreaded cache scenarios w/ and w/o failover.
+ */
+public class GridCacheMultithreadedFailoverAbstractTest extends GridCommonAbstractTest {
+    /** Node name prefix. */
+    private static final String NODE_PREFIX = "node";
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Put condition lock (used to suspend put threads when caches are compared). */
+    private final Lock lock = new ReentrantLock();
+
+    /** Node kill lock (used to prevent killing while cache data is compared). */
+    private final Lock killLock = new ReentrantLock();
+
+    /** Proceed put condition. */
+    private final Condition putCond = lock.newCondition();
+
+    /** Shared IP finder. */
+    private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Caches comparison start latch. */
+    private CountDownLatch cmpLatch;
+
+    /** Caches comparison request flag. */
+    private volatile boolean cmp;
+
+    /**
+     * @return Number of threads executing put.
+     */
+    protected int putThreads() {
+        return 15;
+    }
+
+    /**
+     * @return Test duration in seconds.
+     */
+    protected int duration() {
+        return 3 * 60 * 1000;
+    }
+
+    /**
+     * @return Frequency of cache data comparison.
+     */
+    protected int cacheComparisonFrequency() {
+        return 20 * 1000;
+    }
+
+    /**
+     * @return Put key range.
+     */
+    protected int keyRange() {
+        return 10_000;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * @return Cache atomicity mode.
+     */
+    protected GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @return Atomic write order mode.
+     */
+    protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return null;
+    }
+
+    /**
+     * @return Distribution mode.
+     */
+    protected GridCacheDistributionMode distributionMode() {
+        return NEAR_PARTITIONED;
+    }
+
+    /**
+     * @return Number of data nodes.
+     */
+    protected int dataNodes() {
+        return 3;
+    }
+
+    /**
+     * @return Number of backups.
+     */
+    protected int backups() {
+        return 1;
+    }
+
+    /**
+     * @return Probability of killing data node.
+     */
+    protected int nodeKillProbability() {
+        return 1;
+    }
+
+    /**
+     * @return Min and max value for delay between node killings.
+     */
+    protected T2<Long, Long> killDelay() {
+        return new T2<>(5000L, 10000L);
+    }
+
+    /**
+     * @return Min and max value for delay between node killing and restarting.
+     */
+    protected T2<Long, Long> restartDelay() {
+        return new T2<>(5000L, 10000L);
+    }
+
+    /**
+     * Get node name by index.
+     *
+     * @param idx Node index.
+     * @return Node name.
+     */
+    private String nodeName(int idx) {
+        return NODE_PREFIX + idx;
+    }
+
+    /**
+     * Start up routine.
+     *
+     * @throws Exception If failed.
+     */
+    private void startUp() throws Exception {
+        assert dataNodes() > 0;
+        assert cacheMode() != null;
+        assert atomicityMode() != null;
+
+        for (int i = 0; i < dataNodes(); i++)
+            G.start(configuration(i));
+    }
+
+    /**
+     * Node configuration.
+     *
+     * @param idx Node index.
+     * @return Node configuration.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration configuration(int idx) throws Exception {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setCacheMode(cacheMode());
+        ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setPreloadMode(SYNC);
+        ccfg.setSwapEnabled(false);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setEvictionPolicy(null);
+        ccfg.setNearEvictionPolicy(null);
+
+        if (cacheMode() == PARTITIONED)
+            ccfg.setBackups(backups());
+
+        if (atomicityMode() == ATOMIC) {
+            assert atomicWriteOrderMode() != null;
+
+            ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
+
+            if (cacheMode() == PARTITIONED)
+                ccfg.setDistributionMode(PARTITIONED_ONLY);
+        }
+        else {
+            if (cacheMode() == PARTITIONED) {
+                assert distributionMode() != null;
+
+                ccfg.setDistributionMode(distributionMode());
+            }
+        }
+
+        IgniteConfiguration cfg = getConfiguration(nodeName(idx));
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setRestEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * Actual test.
+     *
+     * @throws Exception If failed.
+     */
+    public void test() throws Exception {
+        startUp();
+
+        final CyclicBarrier startBarrier = new CyclicBarrier(putThreads());
+
+        final Map<Integer, Integer> expVals = new ConcurrentHashMap<>();
+
+        final int keysPerThread = keyRange() / putThreads();
+
+        final AtomicLong ctr = new AtomicLong();
+        final AtomicLong errCtr = new AtomicLong();
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        assert keysPerThread > 0;
+
+        Thread[] putThreads = new Thread[putThreads()];
+
+        for (int i = 0; i < putThreads(); i++) {
+            final int idx = i;
+
+            Thread thread = new Thread(new Runnable() {
+                @Override public void run() {
+                    try {
+                        startBarrier.await();
+                    }
+                    catch (InterruptedException | BrokenBarrierException ignore) {
+                        return ;
+                    }
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Ignite ignite = G.ignite(nodeName(0));
+
+                    GridCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+
+                    int startKey = keysPerThread * idx;
+                    int endKey = keysPerThread * (idx + 1);
+
+                    Map<Integer, Integer> putMap = new HashMap<>();
+                    Collection<Integer> rmvSet = new HashSet<>();
+
+                    while (!stop.get()) {
+                        for (int i = 0; i < 100; i++) {
+                            int key = rnd.nextInt(startKey, endKey);
+
+                            if (rnd.nextInt(0, 10) > 0) {
+                                putMap.put(key, i);
+
+                                rmvSet.remove(key);
+                            }
+                            else {
+                                rmvSet.add(key);
+
+                                putMap.remove(key);
+                            }
+                        }
+                        try {
+                            IgniteTx tx = atomicityMode() == TRANSACTIONAL ? cache.txStart() : null;
+
+                            try {
+                                cache.putAll(putMap);
+                                cache.removeAll(rmvSet);
+
+                                if (tx != null)
+                                    tx.commit();
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
+
+                            expVals.putAll(putMap);
+
+                            for (Integer key : rmvSet)
+                                expVals.remove(key);
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Cache update failed [putMap=" + putMap+ ", rmvSet=" + rmvSet + ']', e);
+
+                            errCtr.incrementAndGet();
+                        }
+
+                        ctr.addAndGet(putMap.size() + rmvSet.size());
+
+                        try {
+                            if (cmp) {
+                                cmpLatch.countDown();
+
+                                lock.lock();
+
+                                try {
+                                    while (cmp)
+                                        putCond.await();
+                                }
+                                finally {
+                                    lock.unlock();
+                                }
+                            }
+                        }
+                        catch (InterruptedException ignore) {
+                            return;
+                        }
+                    }
+                }
+            });
+
+            thread.setName("put-thread-" + i);
+
+            thread.start();
+
+            putThreads[i] = thread;
+        }
+
+        IgniteFuture<?> killNodeFut = null;
+
+        if (nodeKillProbability() > 0) {
+            killNodeFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stop.get()) {
+                        U.sleep(ThreadLocalRandom.current().nextLong(killDelay().get1(), killDelay().get2()));
+
+                        killLock.lock();
+
+                        try {
+                            int idx = ThreadLocalRandom.current().nextInt(1, dataNodes());
+
+                            String gridName = nodeName(idx);
+
+                            if (stop.get())
+                                return null;
+
+                            log.info("Killing node [gridName=" + gridName + ']');
+
+                            stopGrid(gridName);
+
+                            U.sleep(ThreadLocalRandom.current().nextLong(restartDelay().get1(), restartDelay().get2()));
+
+                            if (stop.get())
+                                return null;
+
+                            log.info("Restarting node [gridName=" + gridName + ']');
+
+                            G.start(configuration(idx));
+                        }
+                        finally {
+                            killLock.unlock();
+                        }
+                    }
+
+                    return null;
+                }
+            });
+        }
+
+        boolean failed = false;
+
+        try {
+            long stopTime = U.currentTimeMillis() + duration();
+
+            long nextCmp = U.currentTimeMillis() + cacheComparisonFrequency();
+
+            while (!failed && U.currentTimeMillis() < stopTime) {
+                long start = System.nanoTime();
+
+                long ops = ctr.longValue();
+
+                U.sleep(1000);
+
+                long diff = ctr.longValue() - ops;
+
+                double time = (System.nanoTime() - start) / 1_000_000_000d;
+
+                long opsPerSecond = (long)(diff / time);
+
+                log.info("Operations/second: " + opsPerSecond);
+
+                if (U.currentTimeMillis() >= nextCmp) {
+                    failed = !compare(expVals);
+
+                    nextCmp = System.currentTimeMillis() + cacheComparisonFrequency();
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        if (killNodeFut != null)
+            killNodeFut.get();
+
+        for (Thread thread : putThreads)
+            U.join(thread);
+
+        log.info("Test finished. Put errors: " + errCtr.get());
+
+        assertFalse("Test failed", failed);
+    }
+
+    /**
+     * Compare cache content.
+     *
+     * @param expVals Expected values.
+     * @return {@code True} if check passed successfully.
+     * @throws Exception If failed.
+     */
+    private boolean compare(Map<Integer, Integer> expVals) throws Exception {
+        cmpLatch = new CountDownLatch(putThreads());
+
+        cmp = true;
+
+        killLock.lock();
+
+        try {
+            log.info("Comparing cache content.");
+
+            if (!cmpLatch.await(60_000, TimeUnit.MILLISECONDS))
+                throw new IgniteCheckedException("Failed to suspend threads executing put.");
+
+            if (compareCaches(expVals)) {
+                log.info("Cache comparison succeeded.");
+
+                return true;
+            }
+            else {
+                log.error("Cache comparison failed.");
+
+                return false;
+            }
+        }
+        finally {
+            killLock.unlock();
+
+            lock.lock();
+
+            try {
+                cmp = false;
+
+                putCond.signalAll();
+            }
+            finally {
+                lock.unlock();
+            }
+
+            U.sleep(500);
+        }
+    }
+
+    /**
+     * Compare caches.
+     *
+     * @param expVals Expected values.
+     * @return {@code True} if check passed successfully.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"})
+    private boolean compareCaches(Map<Integer, Integer> expVals) throws Exception {
+        List<GridCache<Integer, Integer>> caches = new ArrayList<>(dataNodes());
+        List<GridDhtCacheAdapter<Integer, Integer>> dhtCaches = null;
+
+        for (int i = 0 ; i < dataNodes(); i++) {
+            GridCache<Integer, Integer> cache = G.ignite(nodeName(i)).cache(CACHE_NAME);
+
+            assert cache != null;
+
+            caches.add(cache);
+
+            GridCacheAdapter<Integer, Integer> cache0 =
+                (GridCacheAdapter<Integer, Integer>)cache.<Integer, Integer>cache();
+
+            if (cache0.isNear()) {
+                if (dhtCaches == null)
+                    dhtCaches = new ArrayList<>(dataNodes());
+
+                dhtCaches.add(((GridNearCacheAdapter<Integer, Integer>)cache0).dht());
+            }
+        }
+
+        // Compare key sets on each cache.
+        Collection<Integer> cacheKeys = new HashSet<>();
+        Collection<Integer> dhtCacheKeys = new HashSet<>();
+
+        for (int i = 0; i < dataNodes(); i++) {
+            cacheKeys.addAll(caches.get(i).keySet());
+
+            if (dhtCaches != null)
+                dhtCacheKeys.addAll(dhtCaches.get(i).keySet());
+        }
+
+        boolean failed = false;
+
+        if (!F.eq(expVals.keySet(), cacheKeys)) {
+            Collection<Integer> expOnly = new HashSet<>();
+            Collection<Integer> cacheOnly = new HashSet<>();
+
+            expOnly.addAll(expVals.keySet());
+            expOnly.removeAll(cacheKeys);
+
+            cacheOnly.addAll(cacheKeys);
+            cacheOnly.removeAll(expVals.keySet());
+
+            if (!expOnly.isEmpty())
+                log.error("Cache does not contain expected keys: " + expOnly);
+
+            if (!cacheOnly.isEmpty())
+                log.error("Cache does contain unexpected keys: " + cacheOnly);
+
+            failed = true;
+        }
+
+        if (dhtCaches != null && !F.eq(expVals.keySet(), dhtCacheKeys)) {
+            Collection<Integer> expOnly = new HashSet<>();
+            Collection<Integer> cacheOnly = new HashSet<>();
+
+            expOnly.addAll(expVals.keySet());
+            expOnly.removeAll(dhtCacheKeys);
+
+            cacheOnly.addAll(dhtCacheKeys);
+            cacheOnly.removeAll(expVals.keySet());
+
+            if (!expOnly.isEmpty())
+                log.error("DHT cache does not contain expected keys: " + expOnly);
+
+            if (!cacheOnly.isEmpty())
+                log.error("DHT cache does contain unexpected keys: " + cacheOnly);
+
+            failed = true;
+        }
+
+        // Compare values.
+        Collection<Integer> failedKeys = new HashSet<>();
+
+        for (Map.Entry<Integer, Integer> entry : expVals.entrySet()) {
+            for (int i = 0; i < dataNodes(); i++) {
+                if (!F.eq(caches.get(i).get(entry.getKey()), entry.getValue()))
+                    failedKeys.add(entry.getKey());
+            }
+        }
+
+        if (!failedKeys.isEmpty()) {
+            log.error("Cache content is incorrect for " + failedKeys.size() + " keys:");
+
+            for (Integer key : failedKeys) {
+                for (int i = 0; i < dataNodes(); i++) {
+                    GridCacheEntry<Integer, Integer> cacheEntry = caches.get(i).entry(key);
+
+                    UUID nodeId = G.ignite(nodeName(i)).cluster().localNode().id();
+
+                    if (!F.eq(cacheEntry.get(), expVals.get(key)))
+                        log.error("key=" + key + ", expVal=" + expVals.get(key) + ", cacheVal=" + cacheEntry.get() +
+                            ", primary=" + cacheEntry.primary() + ", backup=" + cacheEntry.backup() +
+                            ", nodeId=" + nodeId);
+                }
+            }
+
+            failed = true;
+        }
+
+        return !failed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
new file mode 100644
index 0000000..09f3ce9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.IgniteState.*;
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Tests for node failure in transactions.
+ */
+public abstract class GridCacheNodeFailureAbstractTest extends GridCommonAbstractTest {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Grid count. */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private static final Integer KEY = 1;
+
+    /** */
+    private static final String VALUE = "test";
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Grid instances. */
+    private static final List<Ignite> IGNITEs = new ArrayList<>();
+
+    /**
+     * Start grid by default.
+     */
+    protected GridCacheNodeFailureAbstractTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        c.setDeploymentMode(IgniteDeploymentMode.SHARED);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            IGNITEs.add(startGrid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        IGNITEs.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void beforeTest() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            if (Ignition.state(IGNITEs.get(i).name()) == STOPPED) {
+                info("Restarting grid: " + i);
+
+                IGNITEs.set(i, startGrid(i));
+            }
+
+            GridCacheEntry e = cache(i).entry(KEY);
+
+            assert !e.isLocked() : "Entry is locked for grid [idx=" + i + ", entry=" + e + ']';
+        }
+    }
+
+    /**
+     * @param i Grid index.
+     * @return Cache.
+     */
+    @Override protected <K, V> GridCache<K, V> cache(int i) {
+        return IGNITEs.get(i).cache(null);
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticReadCommitted() throws Throwable {
+        // TODO:  GG-7437.
+        if (cache(0).configuration().getCacheMode() == GridCacheMode.REPLICATED)
+            return;
+
+        checkTransaction(PESSIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticRepeatableRead() throws Throwable {
+        checkTransaction(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticSerializable() throws Throwable {
+        checkTransaction(PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If check failed.
+     */
+    private void checkTransaction(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Throwable {
+        int idx = RAND.nextInt(GRID_CNT);
+
+        info("Grid will be stopped: " + idx);
+
+        Ignite g = grid(idx);
+
+        GridCache<Integer, String> cache = cache(idx);
+
+        IgniteTx tx = cache.txStart(concurrency, isolation);
+
+        try {
+            cache.put(KEY, VALUE);
+
+            int checkIdx = (idx + 1) % G.allGrids().size();
+
+            info("Check grid index: " + checkIdx);
+
+            IgniteFuture<?> f = waitForLocalEvent(grid(checkIdx).events(), new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent e) {
+                    info("Received grid event: " + e);
+
+                    return true;
+                }
+            }, EVT_NODE_LEFT);
+
+            stopGrid(idx);
+
+            f.get();
+
+            U.sleep(getInteger(GG_TX_SALVAGE_TIMEOUT, 3000));
+
+            GridCache<Integer, String> checkCache = cache(checkIdx);
+
+            boolean locked = false;
+
+            for (int i = 0; !locked && i < 3; i++) {
+                locked = checkCache.lock(KEY, -1);
+
+                if (!locked)
+                    U.sleep(500);
+                else
+                    break;
+            }
+
+            try {
+                assert locked : "Failed to lock key on cache [idx=" + checkIdx + ", key=" + KEY + ']';
+            }
+            finally {
+                checkCache.unlockAll(F.asList(KEY));
+            }
+        }
+        catch (IgniteTxOptimisticException e) {
+            U.warn(log, "Optimistic transaction failure (will rollback) [msg=" + e.getMessage() + ", tx=" + tx + ']');
+
+            if (G.state(g.name()) == IgniteState.STARTED)
+                tx.rollback();
+
+            assert concurrency == OPTIMISTIC && isolation == SERIALIZABLE;
+        }
+        catch (Throwable e) {
+            error("Transaction failed (will rollback): " + tx, e);
+
+            if (G.state(g.name()) == IgniteState.STARTED)
+                tx.rollback();
+
+            throw e;
+        }
+    }
+
+    /**
+     * @throws Exception If check failed.
+     */
+    public void testLock() throws Exception {
+        int idx = 0;
+
+        info("Grid will be stopped: " + idx);
+
+        info("Nodes for key [id=" + grid(idx).cache(null).affinity().mapKeyToPrimaryAndBackups(KEY) +
+            ", key=" + KEY + ']');
+
+        GridCache<Integer, String> cache = cache(idx);
+
+        // TODO:  GG-7437.
+        if (cache.configuration().getCacheMode() == GridCacheMode.REPLICATED)
+            return;
+
+        cache.put(KEY, VALUE);
+
+        assert cache.lock(KEY, -1);
+
+        int checkIdx = 1;
+
+        info("Check grid index: " + checkIdx);
+
+        GridCache<Integer, String> checkCache = cache(checkIdx);
+
+        assert !checkCache.lock(KEY, -1);
+
+        GridCacheEntry e = checkCache.entry(KEY);
+
+        assert e.isLocked() : "Entry is not locked for grid [idx=" + checkIdx + ", entry=" + e + ']';
+
+        IgniteFuture<?> f = waitForLocalEvent(grid(checkIdx).events(), new P1<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent e) {
+                info("Received grid event: " + e);
+
+                return true;
+            }
+        }, EVT_NODE_LEFT);
+
+        stopGrid(idx);
+
+        f.get();
+
+        boolean locked = false;
+
+        for (int i = 0; !locked && i < 3; i++) {
+            locked = checkCache.lock(KEY, -1);
+
+            if (!locked) {
+                info("Still not locked...");
+
+                U.sleep(1500);
+            }
+            else
+                break;
+        }
+
+        assert locked : "Failed to lock entry: " + e;
+
+        checkCache.unlockAll(F.asList(KEY));
+
+        e = checkCache.entry(KEY);
+
+        assert !e.isLocked() : "Entry is locked for grid [idx=" + checkIdx + ", entry=" + e + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
new file mode 100644
index 0000000..8e84354
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Partitioned affinity test.
+ */
+@SuppressWarnings({"PointlessArithmeticExpression"})
+public class GridCachePartitionedAffinityFilterSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Backup count. */
+    private static final int BACKUPS = 1;
+
+    /** Split attribute name. */
+    private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
+
+    /** Split attribute value. */
+    private String splitAttrVal;
+
+    /** Test backup filter. */
+    private static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
+        new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+            @Override public boolean apply(ClusterNode primary, ClusterNode backup) {
+                assert primary != null : "primary is null";
+                assert backup != null : "backup is null";
+
+                return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME));
+            }
+        };
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction();
+
+        aff.setBackupFilter(backupFilter);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(BACKUPS);
+        cacheCfg.setAffinity(aff);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheCfg);
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionDistribution() throws Exception {
+        try {
+            for (int i = 0; i < 3; i++) {
+                splitAttrVal = "A";
+
+                startGrid(2 * i);
+
+                splitAttrVal = "B";
+
+                startGrid(2 * i + 1);
+
+                awaitPartitionMapExchange();
+
+                checkPartitions();
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPartitions() throws Exception {
+        int partCnt = GridCacheConsistentHashAffinityFunction.DFLT_PARTITION_COUNT;
+
+        GridCacheAffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity();
+
+        GridCache<Object, Object> cache = grid(0).cache(null);
+
+        for (int i = 0; i < partCnt; i++) {
+            assertEquals(i, aff.partition(i));
+
+            Collection<ClusterNode> nodes = cache.affinity().mapKeyToPrimaryAndBackups(i);
+
+            assertEquals(2, nodes.size());
+
+            ClusterNode primary = F.first(nodes);
+            ClusterNode backup = F.last(nodes);
+
+            assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
new file mode 100644
index 0000000..11a8777
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Check reloadAll() on partitioned cache.
+ */
+public abstract class GridCachePartitionedReloadAllAbstractSelfTest extends GridCommonAbstractTest {
+    /** Amount of nodes in the grid. */
+    private static final int GRID_CNT = 4;
+
+    /** Amount of backups in partitioned cache. */
+    private static final int BACKUP_CNT = 1;
+
+    /** IP finder. */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Map where dummy cache store values are stored. */
+    private final Map<Integer, String> map = new ConcurrentHashMap8<>();
+
+    /** Collection of caches, one per grid node. */
+    private List<GridCache<Integer, String>> caches;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+
+        cc.setCacheMode(cacheMode());
+
+        cc.setAtomicityMode(atomicityMode());
+
+        cc.setBackups(BACKUP_CNT);
+
+        cc.setWriteSynchronizationMode(FULL_SYNC);
+
+        CacheStore store = cacheStore();
+
+        if (store != null) {
+            cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+            cc.setReadThrough(true);
+            cc.setWriteThrough(true);
+            cc.setLoadPreviousValue(true);
+        }
+        else
+            cc.setCacheStoreFactory(null);
+
+        cc.setAtomicWriteOrderMode(atomicWriteOrderMode());
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected GridCacheAtomicityMode atomicityMode() {
+        return GridCacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /**
+     * @return Write order mode for atomic cache.
+     */
+    protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CLOCK;
+    }
+
+    /**
+     * @return {@code True} if near cache is enabled.
+     */
+    protected abstract boolean nearEnabled();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        caches = new ArrayList<>(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            caches.add(startGrid(i).<Integer, String>cache(null));
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        map.clear();
+
+        caches = null;
+    }
+
+    /**
+     * Create new cache store.
+     *
+     * @return Write through storage emulator.
+     */
+    protected CacheStore<?, ?> cacheStore() {
+        return new CacheStoreAdapter<Integer, String>() {
+            @IgniteInstanceResource
+            private Ignite g;
+
+            @Override public void loadCache(IgniteBiInClosure<Integer, String> c,
+                Object... args) {
+                X.println("Loading all on: " + caches.indexOf(g.<Integer, String>cache(null)));
+
+                for (Map.Entry<Integer, String> e : map.entrySet())
+                    c.apply(e.getKey(), e.getValue());
+            }
+
+            @Override public String load(Integer key) {
+                X.println("Loading on: " + caches.indexOf(g.<Integer, String>cache(null)) + " key=" + key);
+
+                return map.get(key);
+            }
+
+            @Override public void write(Cache.Entry<? extends Integer, ? extends String> e) {
+                fail("Should not be called within the test.");
+            }
+
+            @Override public void delete(Object key) {
+                fail("Should not be called within the test.");
+            }
+        };
+    }
+
+    /**
+     * Ensure that reloadAll() with disabled near cache reloads data only on a node
+     * on which reloadAll() has been called.
+     *
+     * @throws Exception If test failed.
+     */
+    public void testReloadAll() throws Exception {
+        // Fill caches with values.
+        for (GridCache<Integer, String> cache : caches) {
+            Iterable<Integer> keys = primaryKeysForCache(cache, 100);
+
+            info("Values [cache=" + caches.indexOf(cache) + ", size=" + F.size(keys.iterator()) +  ", keys=" + keys + "]");
+
+            for (Integer key : keys)
+                map.put(key, "val" + key);
+        }
+
+        Collection<GridCache<Integer, String>> emptyCaches = new ArrayList<>(caches);
+
+        for (GridCache<Integer, String> cache : caches) {
+            info("Reloading cache: " + caches.indexOf(cache));
+
+            // Check data is reloaded only on the nodes on which reloadAll() has been called.
+            if (!nearEnabled()) {
+                for (GridCache<Integer, String> eCache : emptyCaches)
+                    assertEquals("Non-null values found in cache [cache=" + caches.indexOf(eCache) +
+                        ", size=" + eCache.size() + ", size=" + eCache.size() +
+                        ", entrySetSize=" + eCache.entrySet().size() + "]",
+                        0, eCache.size());
+            }
+
+            cache.reloadAll(map.keySet());
+
+            for (Integer key : map.keySet()) {
+                GridCacheEntry entry = cache.entry(key);
+
+                if (entry.primary() || entry.backup() || nearEnabled())
+                    assertEquals(map.get(key), cache.peek(key));
+                else
+                    assertNull(cache.peek(key));
+            }
+
+            emptyCaches.remove(cache);
+        }
+    }
+
+    /**
+     * Create list of keys for which the given cache is primary.
+     *
+     * @param cache Cache.
+     * @param cnt Keys count.
+     * @return Collection of keys for which given cache is primary.
+     */
+    private Iterable<Integer> primaryKeysForCache(GridCacheProjection<Integer,String> cache, int cnt) {
+        Collection<Integer> found = new ArrayList<>(cnt);
+
+        for (int i = 0; i < 10000; i++) {
+            if (cache.entry(i).primary()) {
+                found.add(i);
+
+                if (found.size() == cnt)
+                    return found;
+            }
+        }
+
+        throw new IllegalStateException("Unable to find " + cnt + " keys as primary for cache.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadEventsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadEventsAbstractSelfTest.java
new file mode 100644
index 0000000..a89d794
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadEventsAbstractSelfTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.eventstorage.memory.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ *
+ */
+public abstract class GridCachePreloadEventsAbstractSelfTest extends GridCommonAbstractTest {
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+        disco.setIpFinder(ipFinder);
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        MemoryEventStorageSpi evtStorageSpi = new MemoryEventStorageSpi();
+
+        evtStorageSpi.setExpireCount(50_000);
+
+        cfg.setEventStorageSpi(evtStorageSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract GridCacheMode getCacheMode();
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(getCacheMode());
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        if (getCacheMode() == PARTITIONED)
+            cacheCfg.setBackups(1);
+
+        return cacheCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testPreloadEvents() throws Exception {
+        Ignite g1 = startGrid("g1");
+
+        GridCache<Integer, String> cache = g1.cache(null);
+
+        cache.put(1, "val1");
+        cache.put(2, "val2");
+        cache.put(3, "val3");
+
+        Ignite g2 = startGrid("g2");
+
+        Collection<IgniteEvent> evts = g2.events().localQuery(F.<IgniteEvent>alwaysTrue(), EVT_CACHE_PRELOAD_OBJECT_LOADED);
+
+        checkPreloadEvents(evts, g2, U.toIntList(new int[]{1, 2, 3}));
+    }
+
+    /**
+     * @param evts Events.
+     * @param g Grid.
+     * @param keys Keys.
+     */
+    protected void checkPreloadEvents(Collection<IgniteEvent> evts, Ignite g, Collection<? extends Object> keys) {
+        assertEquals(keys.size(), evts.size());
+
+        for (IgniteEvent evt : evts) {
+            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
+            assertEquals(EVT_CACHE_PRELOAD_OBJECT_LOADED, cacheEvt.type());
+            assertEquals(g.cache(null).name(), cacheEvt.cacheName());
+            assertEquals(g.cluster().localNode().id(), cacheEvt.node().id());
+            assertEquals(g.cluster().localNode().id(), cacheEvt.eventNode().id());
+            assertTrue(cacheEvt.hasNewValue());
+            assertNotNull(cacheEvt.newValue());
+            assertTrue("Unexpected key: " + cacheEvt.key(), keys.contains(cacheEvt.key()));
+        }
+    }
+}


Mime
View raw message