ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
new file mode 100644
index 0000000..37689f8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ *
+ * Forum example <a
+ * href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449">
+ * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a>
+ */
+public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest {
+    /** Key count. */
+    private static final int KEY_CNT = 1000;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = GridCachePreloadMode.SYNC;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        assert preloadMode != null;
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setWriteSynchronizationMode(FULL_SYNC);
+        cc.setPreloadMode(preloadMode);
+        cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 521));
+        cc.setBackups(1);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+        disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+        c.setDiscoverySpi(disco);
+        c.setCacheConfiguration(cc);
+
+        c.setCommunicationSpi(new TestCommunicationSpi());
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAutomaticPreload() throws Exception {
+        Ignite g0 = startGrid(0);
+
+        int cnt = KEY_CNT;
+
+        GridCache<String, Integer> c0 = g0.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            c0.put(Integer.toString(i), i);
+
+        Ignite g1 = startGrid(1);
+        Ignite g2 = startGrid(2);
+
+        U.sleep(1000);
+
+        GridCache<String, Integer> c1 = g1.cache(null);
+        GridCache<String, Integer> c2 = g2.cache(null);
+
+        TestCommunicationSpi spi0 = (TestCommunicationSpi)g0.configuration().getCommunicationSpi();
+        TestCommunicationSpi spi1 = (TestCommunicationSpi)g1.configuration().getCommunicationSpi();
+        TestCommunicationSpi spi2 = (TestCommunicationSpi)g2.configuration().getCommunicationSpi();
+
+        info(spi0.sentMessages().size() + " " + spi1.sentMessages().size() + " " + spi2.sentMessages().size());
+
+        checkCache(c0, cnt);
+        checkCache(c1, cnt);
+        checkCache(c2, cnt);
+    }
+
+    /**
+     * Checks if keys are present.
+     *
+     * @param c Cache.
+     * @param keyCnt Key count.
+     */
+    private void checkCache(GridCache<String, Integer> c, int keyCnt) {
+        Ignite g = c.gridProjection().ignite();
+
+        for (int i = 0; i < keyCnt; i++) {
+            String key = Integer.toString(i);
+
+            if (c.affinity().isPrimaryOrBackup(g.cluster().localNode(), key))
+                assertEquals(Integer.valueOf(i), c.peek(key));
+        }
+    }
+
+    /**
+     * Communication SPI that will count single partition update messages.
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Recorded messages. */
+        private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
+            throws IgniteSpiException {
+            recordMessage((GridIoMessage)msg);
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @return Collection of sent messages.
+         */
+        public Collection<GridDhtPartitionsSingleMessage> sentMessages() {
+            return sentMsgs;
+        }
+
+        /**
+         * Adds message to a list if message is of correct type.
+         *
+         * @param msg Message.
+         */
+        private void recordMessage(GridIoMessage msg) {
+            if (msg.message() instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message();
+
+                sentMsgs.add(partSingleMsg);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java
new file mode 100644
index 0000000..2cb3e6f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * MultiThreaded load test for DHT preloader.
+ */
+public class GridCacheDhtPreloadMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Creates new test.
+     */
+    public GridCacheDhtPreloadMultiThreadedSelfTest() {
+        super(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeLeaveBeforePreloadingComplete() throws Exception {
+        try {
+            final CountDownLatch startLatch = new CountDownLatch(1);
+
+            final CountDownLatch stopLatch = new CountDownLatch(1);
+
+            GridTestUtils.runMultiThreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        Ignite g = startGrid("first");
+
+                        g.events().localListen(
+                            new IgnitePredicate<IgniteEvent>() {
+                                @Override public boolean apply(IgniteEvent evt) {
+                                    stopLatch.countDown();
+
+                                    return true;
+                                }
+                            },
+                            IgniteEventType.EVT_NODE_JOINED);
+
+                        startLatch.countDown();
+
+                        stopLatch.await();
+
+                        G.stop(g.name(), false);
+
+                        return null;
+                    }
+                },
+                1,
+                "first"
+            );
+
+            GridTestUtils.runMultiThreaded(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        startLatch.await();
+
+                        startGrid("second");
+
+                        return null;
+                    }
+                },
+                1,
+                "second"
+            );
+        }
+        finally {
+            // Intentionally used this method. See startGrid(String, String).
+            G.stopAll(false);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentNodesStart() throws Exception {
+        try {
+            multithreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        IgniteConfiguration cfg = loadConfiguration("modules/core/src/test/config/spring-multicache.xml");
+
+                        startGrid(Thread.currentThread().getName(), cfg);
+
+                        return null;
+                    }
+                },
+                4,
+                "starter"
+            ).get();
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentNodesStartStop() throws Exception {
+        try {
+            multithreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        String gridName = "grid-" + Thread.currentThread().getName();
+
+                        startGrid(gridName, "modules/core/src/test/config/example-cache.xml");
+
+                        // Immediately stop the grid.
+                        stopGrid(gridName);
+
+                        return null;
+                    }
+                },
+                6,
+                "tester"
+            ).get();
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/core/src/test/config/spring-multicache.xml");
+
+        cfg.setGridName(gridName);
+
+        for (CacheConfiguration cCfg : cfg.getCacheConfiguration()) {
+            if (cCfg.getCacheMode() == GridCacheMode.PARTITIONED) {
+                cCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(2048, null));
+                cCfg.setBackups(1);
+            }
+        }
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java
new file mode 100644
index 0000000..c1a9663
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+
+import static org.apache.ignite.cache.GridCacheMemoryMode.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader} with off-heap value storage.
+ */
+public class GridCacheDhtPreloadOffHeapSelfTest extends GridCacheDhtPreloadSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration cacheCfg = super.cacheConfiguration(gridName);
+
+        cacheCfg.setQueryIndexEnabled(false);
+        cacheCfg.setMemoryMode(OFFHEAP_VALUES);
+        cacheCfg.setOffHeapMaxMemory(0);
+
+        return cacheCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
new file mode 100644
index 0000000..fab89af
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ *
+ * Forum example <a
+ * href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449">
+ * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a>
+ */
+public class GridCacheDhtPreloadPutGetSelfTest extends GridCommonAbstractTest {
+    /** Key count. */
+    private static final int KEY_CNT = 1000;
+
+    /** Iterations count. */
+    private static final int ITER_CNT = 10;
+
+    /** Frequency. */
+    private static final int FREQUENCY = 100;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        assert preloadMode != null;
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setPreloadMode(preloadMode);
+        cacheCfg.setBackups(backups);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetAsync0() throws Exception {
+        preloadMode = ASYNC;
+        backups = 0;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetAsync1() throws Exception {
+        preloadMode = ASYNC;
+        backups = 1;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetAsync2() throws Exception {
+        preloadMode = ASYNC;
+        backups = 2;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetSync0() throws Exception {
+        preloadMode = SYNC;
+        backups = 0;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetSync1() throws Exception {
+        preloadMode = SYNC;
+        backups = 1;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetSync2() throws Exception {
+        preloadMode = SYNC;
+        backups = 2;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetNone0() throws Exception {
+        preloadMode = NONE;
+        backups = 0;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetNone1() throws Exception {
+        preloadMode = NONE;
+        backups = 1;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetNone2() throws Exception {
+        preloadMode = NONE;
+        backups = 2;
+
+        performTest();
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    private void performTest() throws Exception {
+        try {
+            final CountDownLatch writeLatch = new CountDownLatch(1);
+
+            final CountDownLatch readLatch = new CountDownLatch(1);
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            IgniteFuture fut1 = GridTestUtils.runMultiThreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        Ignite g2 = startGrid(2);
+
+                        for (int i = 0; i < ITER_CNT; i++) {
+                            info("Iteration # " + i);
+
+                            GridCache<Integer, Integer> cache = g2.cache(null);
+
+                            for (int j = 0; j < KEY_CNT; j++) {
+                                GridCacheEntry<Integer, Integer> entry = cache.entry(j);
+
+                                assert entry != null;
+
+                                Integer val = entry.getValue();
+
+                                if (j % FREQUENCY == 0)
+                                    info("Read entry: " + entry.getKey() + " -> " + val);
+
+                                if (done.get())
+                                    assert val != null && val == j;
+                            }
+
+                            writeLatch.countDown();
+
+                            readLatch.await();
+                        }
+
+                        return null;
+                    }
+                },
+                1,
+                "reader"
+            );
+
+            IgniteFuture fut2 = GridTestUtils.runMultiThreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        writeLatch.await();
+
+                        Ignite g1 = startGrid(1);
+
+                        GridCache<Integer, Integer> cache = g1.cache(null);
+
+                        for (int j = 0; j < KEY_CNT; j++) {
+                            cache.put(j, j);
+
+                            if (j % FREQUENCY == 0)
+                                info("Stored value in cache: " + j);
+                        }
+
+                        done.set(true);
+
+                        for (int j = 0; j < KEY_CNT; j++) {
+                            GridCacheEntry<Integer, Integer> entry = cache.entry(j);
+
+                            assert entry != null;
+
+                            Integer val = entry.getValue();
+
+                            if (j % FREQUENCY == 0)
+                                info("Read entry: " + entry.getKey() + " -> " + val);
+
+                            assert val != null && val == j;
+                        }
+
+                        if (backups > 0)
+                            stopGrid(1);
+
+                        readLatch.countDown();
+
+                        return null;
+                    }
+                },
+                1,
+                "writer"
+            );
+
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
new file mode 100644
index 0000000..157a827
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.cache.CacheConfiguration.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ */
+public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest {
+    /** Flag to print preloading events. */
+    private static final boolean DEBUG = false;
+
+    /** */
+    private static final long TEST_TIMEOUT = 5 * 60 * 1000;
+
+    /** Default backups. */
+    private static final int DFLT_BACKUPS = 1;
+
+    /** Partitions. */
+    private static final int DFLT_PARTITIONS = 521;
+
+    /** Preload batch size. */
+    private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups = DFLT_BACKUPS;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = ASYNC;
+
+    /** */
+    private int preloadBatchSize = DFLT_BATCH_SIZE;
+
+    /** Number of partitions. */
+    private int partitions = DFLT_PARTITIONS;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public GridCacheDhtPreloadSelfTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setCacheConfiguration(cacheConfiguration(gridName));
+        cfg.setDeploymentMode(CONTINUOUS);
+
+        return cfg;
+    }
+
+    /**
+     * Gets cache configuration for grid with given name.
+     *
+     * @param gridName Grid name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setPreloadBatchSize(preloadBatchSize);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setPreloadMode(preloadMode);
+        cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions));
+        cacheCfg.setBackups(backups);
+
+        return cacheCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+//        resetLog4j(Level.DEBUG, true,
+//            // Categories.
+//            GridDhtPreloader.class.getPackage().getName(),
+//            GridDhtPartitionTopologyImpl.class.getName(),
+//            GridDhtLocalPartition.class.getName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = DFLT_BACKUPS;
+        partitions = DFLT_PARTITIONS;
+        preloadMode = ASYNC;
+        preloadBatchSize = DFLT_BATCH_SIZE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Affinity.
+     */
+    @SuppressWarnings({"unchecked"})
+    private GridCacheAffinity<Integer> affinity(GridCache<Integer, ?> cache) {
+        return cache.affinity();
+    }
+
+    /**
+     * @param c Cache.
+     * @return {@code True} if synchronous preloading.
+     */
+    private boolean isSync(GridCache<?, ?> c) {
+        return c.configuration().getPreloadMode() == SYNC;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferSyncSameCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkActivePartitionTransfer(1000, 4, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferAsyncSameCoordinator() throws Exception {
+        checkActivePartitionTransfer(1000, 4, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferSyncChangingCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkActivePartitionTransfer(1000, 4, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferAsyncChangingCoordinator() throws Exception {
+        checkActivePartitionTransfer(1000, 4, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferSyncRandomCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkActivePartitionTransfer(1000, 4, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActivePartitionTransferAsyncRandomCoordinator() throws Exception {
+        checkActivePartitionTransfer(1000, 4, false, true);
+    }
+
+    /**
+     * @param keyCnt Key count.
+     * @param nodeCnt Node count.
+     * @param sameCoord Same coordinator flag.
+     * @param shuffle Shuffle flag.
+     * @throws Exception If failed.
+     */
+    private void checkActivePartitionTransfer(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle)
+        throws Exception {
+//        resetLog4j(Level.DEBUG, true,
+//            // Categories.
+//            GridDhtPreloader.class.getPackage().getName(),
+//            GridDhtPartitionTopologyImpl.class.getName(),
+//            GridDhtLocalPartition.class.getName());
+
+        try {
+            Ignite ignite1 = startGrid(0);
+
+            GridCache<Integer, String> cache1 = ignite1.cache(null);
+
+            putKeys(cache1, keyCnt);
+            checkKeys(cache1, keyCnt, F.asList(ignite1));
+
+            List<Ignite> ignites = new ArrayList<>(nodeCnt + 1);
+
+            startGrids(nodeCnt, 1, ignites);
+
+            // Check all nodes.
+            for (Ignite g : ignites) {
+                GridCache<Integer, String> c = g.cache(null);
+
+                checkKeys(c, keyCnt, ignites);
+            }
+
+            if (shuffle)
+                Collections.shuffle(ignites);
+
+            if (sameCoord)
+                // Add last.
+                ignites.add(ignite1);
+            else
+                // Add first.
+                ignites.add(0, ignite1);
+
+            if (!sameCoord && shuffle)
+                Collections.shuffle(ignites);
+
+            checkActiveState(ignites);
+
+            info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" +
+                U.grids2names(ignites) + ']');
+
+            Collection<IgniteFuture<?>> futs = new LinkedList<>();
+
+            Ignite last = F.last(ignites);
+
+            for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) {
+                Ignite g = it.next();
+
+                if (!it.hasNext()) {
+                    assert last == g;
+
+                    break;
+                }
+
+                checkActiveState(ignites);
+
+                final UUID nodeId = g.cluster().localNode().id();
+
+                it.remove();
+
+                futs.add(waitForLocalEvent(last.events(), new P1<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent e) {
+                        IgniteCachePreloadingEvent evt = (IgniteCachePreloadingEvent)e;
+
+                        ClusterNode node = evt.discoveryNode();
+
+                        return evt.type() == EVT_CACHE_PRELOAD_STOPPED && node.id().equals(nodeId) &&
+                            evt.discoveryEventType() == EVT_NODE_LEFT;
+                    }
+                }, EVT_CACHE_PRELOAD_STOPPED));
+
+                info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites));
+
+                stopGrid(g.name());
+
+                info("After grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites));
+
+                // Check all left nodes.
+                checkActiveState(ignites);
+            }
+
+            info("Waiting for preload futures: " + F.view(futs, F.unfinishedFutures()));
+
+            X.waitAll(futs);
+
+            info("Finished waiting for preload futures.");
+
+            assert last != null;
+
+            GridCache<Integer, String> lastCache = last.cache(null);
+
+            GridDhtCacheAdapter<Integer, String> dht = dht(lastCache);
+
+            GridCacheAffinity<Integer> aff = affinity(lastCache);
+
+            info("Finished waiting for all exchange futures...");
+
+            for (int i = 0; i < keyCnt; i++) {
+                if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(last.cluster().localNode())) {
+                    GridDhtPartitionTopology<Integer, String> top = dht.topology();
+
+                    for (GridDhtLocalPartition<Integer, String> p : top.localPartitions()) {
+                        Collection<ClusterNode> moving = top.moving(p.id());
+
+                        assert moving.isEmpty() : "Nodes with partition in moving state [part=" + p +
+                            ", moving=" + moving + ']';
+
+                        assert OWNING == p.state() : "Invalid partition state for partition [part=" + p + ", map=" +
+                            top.partitionMap(false) + ']';
+                    }
+                }
+            }
+
+            checkActiveState(ignites);
+        }
+        catch (Error | Exception e) {
+            error("Test failed.", e);
+
+            throw e;
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param grids Grids.
+     */
+    private void checkActiveState(Iterable<Ignite> grids) {
+        // Check that nodes don't have non-active information about other nodes.
+        for (Ignite g : grids) {
+            GridCache<Integer, String> c = g.cache(null);
+
+            GridDhtCacheAdapter<Integer, String> dht = dht(c);
+
+            GridDhtPartitionFullMap allParts = dht.topology().partitionMap(false);
+
+            for (GridDhtPartitionMap parts : allParts.values()) {
+                if (!parts.nodeId().equals(g.cluster().localNode().id())) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
+                        int p = e.getKey();
+
+                        GridDhtPartitionState state = e.getValue();
+
+                        assert state == OWNING || state == MOVING || state == RENTING :
+                            "Invalid state [grid=" + g.name() + ", part=" + p + ", state=" + state +
+                                ", parts=" + parts + ']';
+
+                        assert state.active();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiplePartitionBatchesSyncPreload() throws Exception {
+        preloadMode = SYNC;
+        preloadBatchSize = 100;
+        partitions = 2;
+
+        checkNodes(1000, 1, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiplePartitionBatchesAsyncPreload() throws Exception {
+        preloadBatchSize = 100;
+        partitions = 2;
+
+        checkNodes(1000, 1, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesSyncPreloadSameCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkNodes(1000, 4, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesAsyncPreloadSameCoordinator() throws Exception {
+        checkNodes(1000, 4, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesSyncPreloadChangingCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkNodes(1000, 4, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesAsyncPreloadChangingCoordinator() throws Exception {
+        checkNodes(1000, 4, false, false);
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesSyncPreloadRandomCoordinator() throws Exception {
+        preloadMode = SYNC;
+
+        checkNodes(1000, 4, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesAsyncPreloadRandomCoordinator() throws Exception {
+        checkNodes(1000, 4, false, true);
+    }
+
+    /**
+     * @param cnt Number of grids.
+     * @param startIdx Start node index.
+     * @param list List of started grids.
+     * @throws Exception If failed.
+     */
+    private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception {
+        for (int i = 0; i < cnt; i++) {
+            final Ignite g = startGrid(startIdx++);
+
+            if (DEBUG)
+                g.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent evt) {
+                        info("\n>>> Preload event [grid=" + g.name() + ", evt=" + evt + ']');
+
+                        return true;
+                    }
+                }, EVTS_CACHE_PRELOAD);
+
+            list.add(g);
+        }
+    }
+
+    /**
+     * @param grids Grids to stop.
+     */
+    private void stopGrids(Iterable<Ignite> grids) {
+        for (Ignite g : grids)
+            stopGrid(g.name());
+    }
+
+    /**
+     * @param keyCnt Key count.
+     * @param nodeCnt Node count.
+     * @param sameCoord Same coordinator flag.
+     * @param shuffle Shuffle flag.
+     * @throws Exception If failed.
+     */
+    private void checkNodes(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle)
+        throws Exception {
+//        resetLog4j(Level.DEBUG, true,
+//            // Categories.
+//            GridDhtPreloader.class.getPackage().getName(),
+//            GridDhtPartitionTopologyImpl.class.getName(),
+//            GridDhtLocalPartition.class.getName());
+
+        try {
+            Ignite ignite1 = startGrid(0);
+
+            GridCache<Integer, String> cache1 = ignite1.cache(null);
+
+            putKeys(cache1, keyCnt);
+            checkKeys(cache1, keyCnt, F.asList(ignite1));
+
+            List<Ignite> ignites = new ArrayList<>(nodeCnt + 1);
+
+            startGrids(nodeCnt, 1, ignites);
+
+            // Check all nodes.
+            for (Ignite g : ignites) {
+                GridCache<Integer, String> c = g.cache(null);
+
+                checkKeys(c, keyCnt, ignites);
+            }
+
+            if (shuffle)
+                Collections.shuffle(ignites);
+
+            if (sameCoord)
+                // Add last.
+                ignites.add(ignite1);
+            else
+                // Add first.
+                ignites.add(0, ignite1);
+
+            if (!sameCoord && shuffle)
+                Collections.shuffle(ignites);
+
+            info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" +
+                U.grids2names(ignites) + ']');
+
+            Ignite last = null;
+
+            for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) {
+                Ignite g = it.next();
+
+                if (!it.hasNext()) {
+                    last = g;
+
+                    break;
+                }
+
+                final UUID nodeId = g.cluster().localNode().id();
+
+                it.remove();
+
+                Collection<IgniteFuture<?>> futs = new LinkedList<>();
+
+                for (Ignite gg : ignites)
+                    futs.add(waitForLocalEvent(gg.events(), new P1<IgniteEvent>() {
+                            @Override public boolean apply(IgniteEvent e) {
+                                IgniteCachePreloadingEvent evt = (IgniteCachePreloadingEvent)e;
+
+                                ClusterNode node = evt.discoveryNode();
+
+                                return evt.type() == EVT_CACHE_PRELOAD_STOPPED && node.id().equals(nodeId) &&
+                                    evt.discoveryEventType() == EVT_NODE_LEFT;
+                            }
+                        }, EVT_CACHE_PRELOAD_STOPPED));
+
+
+                info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites));
+
+                stopGrid(g.name());
+
+                info(">>> Waiting for preload futures [leftNode=" + g.name() + ", remaining=" + U.grids2names(ignites) + ']');
+
+                X.waitAll(futs);
+
+                info("After grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites));
+
+                // Check all left nodes.
+                for (Ignite gg : ignites) {
+                    GridCache<Integer, String> c = gg.cache(null);
+
+                    checkKeys(c, keyCnt, ignites);
+                }
+            }
+
+            assert last != null;
+
+            GridCache<Integer, String> lastCache = last.cache(null);
+
+            GridDhtCacheAdapter<Integer, String> dht = dht(lastCache);
+
+            GridCacheAffinity<Integer> aff = affinity(lastCache);
+
+            for (int i = 0; i < keyCnt; i++) {
+                if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(last.cluster().localNode())) {
+                    GridDhtPartitionTopology<Integer, String> top = dht.topology();
+
+                    for (GridDhtLocalPartition<Integer, String> p : top.localPartitions()) {
+                        Collection<ClusterNode> moving = top.moving(p.id());
+
+                        assert moving.isEmpty() : "Nodes with partition in moving state [part=" + p +
+                            ", moving=" + moving + ']';
+
+                        assert OWNING == p.state() : "Invalid partition state for partition [part=" + p + ", map=" +
+                            top.partitionMap(false) + ']';
+                    }
+                }
+            }
+        }
+        catch (Error | Exception e) {
+            error("Test failed.", e);
+
+            throw e;
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException {
+        for (int i = 0; i < cnt; i++)
+            c.put(i, Integer.toString(i));
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Key count.
+     * @param grids Grids.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkKeys(GridCache<Integer, String> cache, int cnt, Iterable<Ignite> grids) throws IgniteCheckedException {
+        GridCacheAffinity<Integer> aff = affinity(cache);
+
+        Ignite ignite = cache.gridProjection().ignite();
+
+        ClusterNode loc = ignite.cluster().localNode();
+
+        boolean sync = cache.configuration().getPreloadMode() == SYNC;
+
+        for (int i = 0; i < cnt; i++) {
+            Collection<ClusterNode> nodes = ignite.cluster().nodes();
+
+            Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(i));
+
+            assert !affNodes.isEmpty();
+
+            if (affNodes.contains(loc)) {
+                String val = sync ? cache.peek(i) : cache.get(i);
+
+                ClusterNode primaryNode = F.first(affNodes);
+
+                assert primaryNode != null;
+
+                boolean primary = primaryNode.equals(loc);
+
+                assert Integer.toString(i).equals(val) : "Key check failed [grid=" + ignite.name() +
+                    ", cache=" + cache.name() + ", key=" + i + ", expected=" + i + ", actual=" + val +
+                    ", part=" + aff.partition(i) + ", primary=" + primary + ", affNodes=" + U.nodeIds(affNodes) +
+                    ", locId=" + loc.id() + ", allNodes=" + U.nodeIds(nodes) + ", allParts=" + top2string(grids) + ']';
+            }
+        }
+    }
+
+    /**
+     * @param grids Grids
+     * @return String representation of all partitions and their state.
+     */
+    @SuppressWarnings( {"ConstantConditions"})
+    private String top2string(Iterable<Ignite> grids) {
+        Map<String, String> map = new HashMap<>();
+
+        for (Ignite g : grids) {
+            GridCache<Integer, String> c = g.cache(null);
+
+            GridDhtCacheAdapter<Integer, String> dht = dht(c);
+
+            GridDhtPartitionFullMap fullMap = dht.topology().partitionMap(false);
+
+            map.put(g.name(), DEBUG ? fullMap.toFullString() : fullMap.toString());
+        }
+
+        return "Grid partition maps [" + map.toString() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java
new file mode 100644
index 0000000..15b0145
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheConfiguration.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ */
+public class GridCacheDhtPreloadStartStopSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final long TEST_TIMEOUT = 5 * 60 * 1000;
+
+    /** Default backups. */
+    private static final int DFLT_BACKUPS = 1;
+
+    /** Partitions. */
+    private static final int DFLT_PARTITIONS = 521;
+
+    /** Preload batch size. */
+    private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE;
+
+    /** Default cache count. */
+    private static final int DFLT_CACHE_CNT = 10;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups = DFLT_BACKUPS;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = ASYNC;
+
+    /** */
+    private int preloadBatchSize = DFLT_BATCH_SIZE;
+
+    /** Number of partitions. */
+    private int partitions = DFLT_PARTITIONS;
+
+    /** */
+    private int cacheCnt = DFLT_CACHE_CNT;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public GridCacheDhtPreloadStartStopSelfTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration[] cacheCfgs = new CacheConfiguration[cacheCnt];
+
+        for (int i = 0; i < cacheCnt; i++) {
+            CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+            cacheCfg.setName("partitioned-" + i);
+
+            cacheCfg.setCacheMode(PARTITIONED);
+            cacheCfg.setPreloadBatchSize(preloadBatchSize);
+            cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+            cacheCfg.setPreloadMode(preloadMode);
+            cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions));
+            cacheCfg.setBackups(backups);
+            cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+            cacheCfgs[i] = cacheCfg;
+        }
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setCacheConfiguration(cacheCfgs);
+        cfg.setDeploymentMode(CONTINUOUS);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = DFLT_BACKUPS;
+        partitions = DFLT_PARTITIONS;
+        preloadMode = ASYNC;
+        preloadBatchSize = DFLT_BATCH_SIZE;
+        cacheCnt = DFLT_CACHE_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Affinity.
+     */
+    private GridCacheAffinity<Integer> affinity(GridCache<Integer, ?> cache) {
+        return cache.affinity();
+    }
+
+    /**
+     * @param c Cache.
+     * @return {@code True} if synchronoous preloading.
+     */
+    private boolean isSync(GridCache<?, ?> c) {
+        return c.configuration().getPreloadMode() == SYNC;
+    }
+
+    /**
+     * @param cnt Number of grids.
+     * @param startIdx Start node index.
+     * @param list List of started grids.
+     * @throws Exception If failed.
+     */
+    private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception {
+        for (int i = 0; i < cnt; i++)
+            list.add(startGrid(startIdx++));
+    }
+
+    /** @param grids Grids to stop. */
+    private void stopGrids(Iterable<Ignite> grids) {
+        for (Ignite g : grids)
+            stopGrid(g.name());
+    }
+
+    /** @throws Exception If failed. */
+    public void testDeadlock() throws Exception {
+        info("Testing deadlock...");
+
+        Collection<Ignite> ignites = new LinkedList<>();
+
+        int gridCnt = 3;
+
+        startGrids(gridCnt, 1, ignites);
+
+        info("Grids started: " + gridCnt);
+
+        stopGrids(ignites);
+    }
+
+    /**
+     * @param keyCnt Key count.
+     * @param nodeCnt Node count.
+     * @throws Exception If failed.
+     */
+    private void checkNodes(int keyCnt, int nodeCnt) throws Exception {
+        try {
+            Ignite g1 = startGrid(0);
+
+            GridCache<Integer, String> c1 = g1.cache(null);
+
+            putKeys(c1, keyCnt);
+            checkKeys(c1, keyCnt);
+
+            Collection<Ignite> ignites = new LinkedList<>();
+
+            startGrids(nodeCnt, 1, ignites);
+
+            // Check all nodes.
+            for (Ignite g : ignites) {
+                GridCache<Integer, String> c = g.cache(null);
+
+                checkKeys(c, keyCnt);
+            }
+
+            info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ']');
+
+            stopGrids(ignites);
+
+            GridDhtCacheAdapter<Integer, String> dht = dht(c1);
+
+            info(">>> Waiting for preload futures...");
+
+            GridCachePartitionExchangeManager<Object, Object> exchMgr
+                = ((GridKernal)g1).context().cache().context().exchange();
+
+            // Wait for exchanges to complete.
+            for (IgniteFuture<?> fut : exchMgr.exchangeFutures())
+                fut.get();
+
+            GridCacheAffinity<Integer> aff = affinity(c1);
+
+            for (int i = 0; i < keyCnt; i++) {
+                if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(g1.cluster().localNode())) {
+                    GridDhtPartitionTopology<Integer, String> top = dht.topology();
+
+                    for (GridDhtLocalPartition<Integer, String> p : top.localPartitions())
+                        assertEquals("Invalid partition state for partition: " + p, OWNING, p.state());
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException {
+        for (int i = 0; i < cnt; i++)
+            c.put(i, Integer.toString(i));
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException {
+        GridCacheAffinity<Integer> aff = affinity(c);
+
+        boolean sync = isSync(c);
+
+        Ignite ignite = c.gridProjection().ignite();
+
+        for (int i = 0; i < cnt; i++) {
+            if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(ignite.cluster().localNode())) {
+                String val = sync ? c.peek(i) : c.get(i);
+
+                assertEquals("Key check failed [grid=" + ignite.name() + ", cache=" + c.name() + ", key=" + i + ']',
+                    Integer.toString(i), val);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java
new file mode 100644
index 0000000..a9732e8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheConfiguration.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Test large cache counts.
+ */
+@SuppressWarnings({"BusyWait"})
+public class GridCacheDhtPreloadUnloadSelfTest extends GridCommonAbstractTest {
+    /** Default backups. */
+    private static final int DFLT_BACKUPS = 1;
+
+    /** Partitions. */
+    private static final int DFLT_PARTITIONS = 521;
+
+    /** Preload batch size. */
+    private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups = DFLT_BACKUPS;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = ASYNC;
+
+    /** */
+    private int preloadBatchSize = DFLT_BATCH_SIZE;
+
+    /** Number of partitions. */
+    private int partitions = DFLT_PARTITIONS;
+
+    /** */
+    private LifecycleBean lbean;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Network timeout. */
+    private long netTimeout = 1000;
+
+    /**
+     *
+     */
+    public GridCacheDhtPreloadUnloadSelfTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setPreloadBatchSize(preloadBatchSize);
+        cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setPreloadMode(preloadMode);
+        cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions));
+        cc.setBackups(backups);
+        cc.setAtomicityMode(TRANSACTIONAL);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        if (lbean != null)
+            c.setLifecycleBeans(lbean);
+
+        c.setDiscoverySpi(disco);
+        c.setCacheConfiguration(cc);
+        c.setDeploymentMode(CONTINUOUS);
+        c.setNetworkTimeout(netTimeout);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = DFLT_BACKUPS;
+        partitions = DFLT_PARTITIONS;
+        preloadMode = ASYNC;
+        preloadBatchSize = DFLT_BATCH_SIZE;
+        netTimeout = 1000;
+    }
+
+    /** @throws Exception If failed. */
+    public void testUnloadZeroBackupsTwoNodes() throws Exception {
+        preloadMode = SYNC;
+        backups = 0;
+        netTimeout = 500;
+
+        try {
+            startGrid(0);
+
+            int cnt = 1000;
+
+            populate(grid(0).<Integer, String>cache(null), cnt);
+
+            int gridCnt = 2;
+
+            for (int i = 1; i < gridCnt; i++)
+                startGrid(i);
+
+            long wait = 3000;
+
+            waitForUnload(gridCnt, cnt, wait);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testUnloadOneBackupTwoNodes() throws Exception {
+        preloadMode = SYNC;
+        backups = 1;
+        netTimeout = 500;
+
+        try {
+            startGrid(0);
+
+            int cnt = 1000;
+
+            populate(grid(0).<Integer, String>cache(null), cnt);
+
+            int gridCnt = 2;
+
+            for (int i = 1; i < gridCnt; i++)
+                startGrid(i);
+
+            long wait = 2000;
+
+            info("Sleeping for " + wait + "ms");
+
+            // Unfortunately there is no other way but sleep.
+            Thread.sleep(wait);
+
+            for (int i = 0; i < gridCnt; i++)
+                info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']');
+
+            for (int i = 0; i < gridCnt; i++) {
+                GridCache<Integer, String> c = grid(i).cache(null);
+
+                // Nothing should be unloaded since nodes are backing up each other.
+                assert c.size() == cnt;
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     * @param gridCnt Grid count.
+     * @param cnt Count.
+     * @param wait Wait.
+     * @throws InterruptedException If interrupted.
+     */
+    private void waitForUnload(long gridCnt, long cnt, long wait) throws InterruptedException {
+        info("Waiting for preloading to complete for " + wait + "ms...");
+
+        long endTime = System.currentTimeMillis() + wait;
+
+        while (System.currentTimeMillis() < endTime) {
+            boolean err = false;
+
+            for (int i = 0; i < gridCnt; i++) {
+                GridCache<Integer, String> c = grid(i).cache(null);
+
+                if (c.size() >= cnt)
+                    err = true;
+            }
+
+            if (!err)
+                break;
+            else
+                Thread.sleep(500);
+        }
+
+        for (int i = 0; i < gridCnt; i++)
+            info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']');
+
+        for (int i = 0; i < gridCnt; i++) {
+            GridCache<Integer, String> c = grid(i).cache(null);
+
+            assert c.size() < cnt;
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testUnloadOneBackupThreeNodes() throws Exception {
+        preloadMode = SYNC;
+        backups = 1;
+        netTimeout = 500;
+        partitions = 23;
+
+        try {
+            startGrid(0);
+
+            int cnt = 1000;
+
+            populate(grid(0).<Integer, String>cache(null), cnt);
+
+            int gridCnt = 3;
+
+            for (int i = 1; i < gridCnt; i++) {
+                startGrid(i);
+
+                for (int j = 0; j <= i; j++)
+                    info("Grid size [i=" + i + ", size=" + grid(j).cache(null).size() + ']');
+            }
+
+            long wait = 3000;
+
+            waitForUnload(gridCnt, cnt, wait);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testUnloadOneBackThreeNodesWithLifeCycleBean() throws Exception {
+        preloadMode = SYNC;
+        backups = 1;
+
+        try {
+            final int cnt = 1000;
+
+            lbean = new LifecycleBean() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException {
+                    if (evt == LifecycleEventType.AFTER_GRID_START) {
+                        GridCache<Integer, String> c = ignite.cache(null);
+
+                        if (c.putxIfAbsent(-1, "true")) {
+                            populate(ignite.<Integer, String>cache(null), cnt);
+
+                            info(">>> POPULATED GRID <<<");
+                        }
+                    }
+                }
+            };
+
+            int gridCnt = 3;
+
+            for (int i = 0; i < gridCnt; i++) {
+                startGrid(i);
+
+                for (int j = 0; j < i; j++)
+                    info("Grid size [i=" + i + ", size=" + grid(j).cache(null).size() + ']');
+            }
+
+            long wait = 3000;
+
+            waitForUnload(gridCnt, cnt, wait);
+        }
+        finally {
+            lbean = null;
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void populate(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException {
+        for (int i = 0; i < cnt; i++)
+            c.put(i, value(1024));
+    }
+
+    /**
+     * @param size Size.
+     * @return Value.
+     */
+    private String value(int size) {
+        StringBuilder b = new StringBuilder(size / 2 + 1);
+
+        for (int i = 0; i < size / 3; i++)
+            b.append('a' + (i % 26));
+
+        return b.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java
new file mode 100644
index 0000000..d501d96
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests that removes are not lost when topology changes.
+ */
+public class GridCacheDhtRemoveFailureTest extends GridCacheAbstractRemoveFailureTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setDistributionMode(PARTITIONED_ONLY);
+        cfg.setBackups(1);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
new file mode 100644
index 0000000..4da11a9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.lang.reflect.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Utility methods for dht preloader testing.
+ */
+public class GridCacheDhtTestUtils {
+    /**
+     * Ensure singleton.
+     */
+    private GridCacheDhtTestUtils() {
+        // No-op.
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Dht cache.
+     */
+    static <K, V> GridDhtCacheAdapter<K, V> dht(GridCacheProjection<K, V> cache) {
+        return ((GridNearCacheAdapter<K, V>)cache.<K, V>cache()).dht();
+    }
+
+    /**
+     * @param dht Cache.
+     * @param keyCnt Number of test keys to put into cache.
+     * @throws IgniteCheckedException If failed to prepare.
+     */
+    @SuppressWarnings({"UnusedAssignment", "unchecked"})
+    static void prepareKeys(GridDhtCache<Integer, String> dht, int keyCnt) throws IgniteCheckedException {
+        GridCacheAffinityFunction aff = dht.context().config().getAffinity();
+
+        GridCacheConcurrentMap<Integer, String> cacheMap;
+
+        try {
+            Field field = GridCacheAdapter.class.getDeclaredField("map");
+
+            field.setAccessible(true);
+
+            cacheMap = (GridCacheConcurrentMap<Integer, String>)field.get(dht);
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException("Failed to get cache map.", e);
+        }
+
+        GridDhtPartitionTopology<Integer,String> top = dht.topology();
+
+        for (int i = 0; i < keyCnt; i++) {
+            cacheMap.putEntry(-1, i, "value" + i, 0);
+
+            dht.preloader().request(Collections.singleton(i), -1);
+
+            GridDhtLocalPartition part = top.localPartition(aff.partition(i), false);
+
+            assert part != null;
+
+            part.own();
+        }
+    }
+
+    /**
+     * @param cache Dht cache.
+     */
+    static void printAffinityInfo(GridCache<?, ?> cache) {
+        GridCacheConsistentHashAffinityFunction aff =
+            (GridCacheConsistentHashAffinityFunction)cache.configuration().getAffinity();
+
+        System.out.println("Affinity info.");
+        System.out.println("----------------------------------");
+        System.out.println("Number of key backups: " + cache.configuration().getBackups());
+        System.out.println("Number of cache partitions: " + aff.getPartitions());
+    }
+
+    /**
+     * @param dht Dht cache.
+     * @param idx Cache index
+     */
+    static void printDhtTopology(GridDhtCache<Integer, String> dht, int idx) {
+        final GridCacheAffinity<Integer> aff = dht.affinity();
+
+        Ignite ignite = dht.context().grid();
+        ClusterNode locNode = ignite.cluster().localNode();
+
+        GridDhtPartitionTopology<Integer, String> top = dht.topology();
+
+        System.out.println("\nTopology of cache #" + idx + " (" + locNode.id() + ")" + ":");
+        System.out.println("----------------------------------");
+
+        List<Integer> affParts = new LinkedList<>();
+
+        GridDhtPartitionMap map = dht.topology().partitions(locNode.id());
+
+        if (map != null)
+            for (int p : map.keySet())
+                affParts.add(p);
+
+        Collections.sort(affParts);
+
+        System.out.println("Affinity partitions: " + affParts + "\n");
+
+        List<GridDhtLocalPartition> locals = new ArrayList<GridDhtLocalPartition>(top.localPartitions());
+
+        Collections.sort(locals);
+
+        for (final GridDhtLocalPartition part : locals) {
+            Collection<ClusterNode> partNodes = aff.mapKeyToPrimaryAndBackups(part.id());
+
+            String ownStr = !partNodes.contains(dht.context().localNode()) ? "NOT AN OWNER" :
+                F.eqNodes(CU.primary(partNodes), locNode) ? "PRIMARY" : "BACKUP";
+
+            Collection<Integer> keys = F.viewReadOnly(dht.keySet(), F.<Integer>identity(), new P1<Integer>() {
+                @Override public boolean apply(Integer k) {
+                    return aff.partition(k) == part.id();
+                }
+            });
+
+            System.out.println("Local partition: [" + part + "], [owning=" + ownStr + ", keyCnt=" + keys.size() +
+                ", keys=" + keys + "]");
+        }
+
+        System.out.println("\nNode map:");
+
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(false).entrySet()) {
+            List<Integer> list = new ArrayList<>(e.getValue().keySet());
+
+            Collections.sort(list);
+
+            System.out.println("[node=" + e.getKey() + ", parts=" + list + "]");
+        }
+
+        System.out.println("");
+    }
+
+    /**
+     * Checks consistency of partitioned cache.
+     * Any preload processes must be finished before this method call().
+     *
+     * @param dht Dht cache.
+     * @param idx Cache index.
+     * @param log Logger.
+     */
+    static void checkDhtTopology(GridDhtCache<Integer, String> dht, int idx, IgniteLogger log) {
+        assert dht != null;
+        assert idx >= 0;
+        assert log != null;
+
+        log.info("Checking balanced state of cache #" + idx);
+
+        GridCacheAffinity<Integer> aff = dht.affinity();
+
+        Ignite ignite = dht.context().grid();
+        ClusterNode locNode = ignite.cluster().localNode();
+
+        GridDhtPartitionTopology<Integer,String> top = dht.topology();
+
+        // Expected partitions calculated with affinity function.
+        // They should be in topology in OWNING state.
+        Collection<Integer> affParts = new HashSet<>();
+
+        GridDhtPartitionMap map = dht.topology().partitions(locNode.id());
+
+        if (map != null)
+            for (int p : map.keySet())
+                affParts.add(p);
+
+        if (F.isEmpty(affParts))
+            return;
+
+        for (int p : affParts)
+            assert top.localPartition(p, false) != null :
+                "Partition does not exist in topology: [cache=" + idx + ", part=" + p + "]";
+
+        for (GridDhtLocalPartition p : top.localPartitions()) {
+            assert affParts.contains(p.id()) :
+                "Invalid local partition: [cache=" + idx + ", part=" + p + ", node partitions=" + affParts + "]";
+
+            assert p.state() == OWNING : "Invalid partition state [cache=" + idx + ", part=" + p + "]";
+
+            Collection<ClusterNode> partNodes = aff.mapPartitionToPrimaryAndBackups(p.id());
+
+            assert partNodes.contains(locNode) :
+                "Partition affinity nodes does not contain local node: [cache=" + idx + "]";
+        }
+
+        // Check keys.
+        for (GridCacheEntryEx<Integer, String> e : dht.entries()) {
+            GridDhtCacheEntry<Integer, String> entry = (GridDhtCacheEntry<Integer, String>)e;
+
+            if (!affParts.contains(entry.partition()))
+                log.warning("Partition of stored entry is obsolete for node: [cache=" + idx + ", entry=" + entry +
+                    ", node partitions=" + affParts + "]");
+
+            int p = aff.partition(entry.key());
+
+            if (!affParts.contains(p))
+                log.warning("Calculated entry partition is not in node partitions: [cache=" + idx + ", part=" + p +
+                    ", entry=" + entry + ", node partitions=" + affParts + "]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
new file mode 100644
index 0000000..76d34e3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests cache transaction during preloading.
+ */
+public class GridCacheDhtTxPreloadSelfTest extends IgniteTxPreloadAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setDistributionMode(PARTITIONED_ONLY);
+        cfg.setBackups(4);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java
new file mode 100644
index 0000000..48c0765
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ * Tests private cache interface on colocated cache.
+ */
+public class GridCacheExColocatedFullApiSelfTest extends GridCacheExAbstractFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+}


Mime
View raw message