ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
new file mode 100644
index 0000000..b0b432b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Tests for dht cache eviction.
+ */
+public class GridCacheDhtEvictionNearReadersSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 4;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Default constructor. */
+    public GridCacheDhtEvictionNearReadersSelfTest() {
+        super(false /* don't start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(false);
+        cacheCfg.setEvictSynchronized(true);
+        cacheCfg.setEvictNearSynchronized(true);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAtomicityMode(atomicityMode());
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+        cacheCfg.setBackups(1);
+
+        // Set eviction queue size explicitly.
+        cacheCfg.setEvictSynchronizedKeyBufferSize(1);
+        cacheCfg.setEvictMaxOverflowRatio(0);
+        cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10));
+        cacheCfg.setNearEvictionPolicy(new GridCacheFifoEvictionPolicy(10));
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @return Atomicity mode.
+     */
+    public GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions"})
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        if (GRID_CNT < 2)
+            throw new IgniteCheckedException("GRID_CNT must not be less than 2.");
+
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"SizeReplaceableByIsEmpty"})
+    @Override protected void beforeTest() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            assert near(grid(i)).size() == 0;
+            assert dht(grid(i)).size() == 0;
+
+            assert near(grid(i)).isEmpty();
+            assert dht(grid(i)).isEmpty();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override protected void afterTest() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            near(grid(i)).removeAll(new IgnitePredicate[] {F.alwaysTrue()});
+
+            assert near(grid(i)).isEmpty() : "Near cache is not empty [idx=" + i + "]";
+            assert dht(grid(i)).isEmpty() : "Dht cache is not empty [idx=" + i + "]";
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @return Grid for the given node.
+     */
+    private Ignite grid(ClusterNode node) {
+        return G.ignite(node.id());
+    }
+
+    /**
+     * @param g Grid.
+     * @return Near cache.
+     */
+    @SuppressWarnings({"unchecked"})
+    private GridNearCacheAdapter<Integer, String> near(Ignite g) {
+        return (GridNearCacheAdapter)((GridKernal)g).internalCache();
+    }
+
+    /**
+     * @param g Grid.
+     * @return Dht cache.
+     */
+    @SuppressWarnings({"unchecked", "TypeMayBeWeakened"})
+    private GridDhtCacheAdapter<Integer, String> dht(Ignite g) {
+        return ((GridNearCacheAdapter)((GridKernal)g).internalCache()).dht();
+    }
+
+    /**
+     * @param idx Index.
+     * @return Affinity.
+     */
+    private GridCacheConsistentHashAffinityFunction affinity(int idx) {
+        return (GridCacheConsistentHashAffinityFunction)grid(idx).cache(null).configuration().getAffinity();
+    }
+
+    /**
+     * @param key Key.
+     * @return Primary node for the given key.
+     */
+    private Collection<ClusterNode> keyNodes(Object key) {
+        GridCacheConsistentHashAffinityFunction aff = affinity(0);
+
+        return aff.nodes(aff.partition(key), grid(0).nodes(), 1);
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @return Predicate for events belonging to specified node.
+     */
+    private IgnitePredicate<IgniteEvent> nodeEvent(final UUID nodeId) {
+        assert nodeId != null;
+
+        return new P1<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent e) {
+                info("Predicate called [e.nodeId()=" + e.node().id() + ", nodeId=" + nodeId + ']');
+
+                return e.node().id().equals(nodeId);
+            }
+        };
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testReaders() throws Exception {
+        Integer key = 1;
+
+        Collection<ClusterNode> nodes = new ArrayList<>(keyNodes(key));
+
+        ClusterNode primary = F.first(nodes);
+
+        assert primary != null;
+
+        nodes.remove(primary);
+
+        ClusterNode backup = F.first(nodes);
+
+        assert backup != null;
+
+        // Now calculate other node that doesn't own the key.
+        nodes = new ArrayList<>(grid(0).nodes());
+
+        nodes.remove(primary);
+        nodes.remove(backup);
+
+        ClusterNode other = F.first(nodes);
+
+        assert !F.eqNodes(primary, backup);
+        assert !F.eqNodes(primary, other);
+        assert !F.eqNodes(backup, other);
+
+        info("Primary node: " + primary.id());
+        info("Backup node: " + backup.id());
+        info("Other node: " + other.id());
+
+        GridNearCacheAdapter<Integer, String> nearPrimary = near(grid(primary));
+        GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(grid(primary));
+
+        GridNearCacheAdapter<Integer, String> nearBackup = near(grid(backup));
+        GridDhtCacheAdapter<Integer, String> dhtBackup = dht(grid(backup));
+
+        GridNearCacheAdapter<Integer, String> nearOther = near(grid(other));
+        GridDhtCacheAdapter<Integer, String> dhtOther = dht(grid(other));
+
+        String val = "v1";
+
+        // Put on primary node.
+        nearPrimary.put(key, val);
+
+        GridDhtCacheEntry<Integer, String> entryPrimary = dhtPrimary.peekExx(key);
+        GridDhtCacheEntry<Integer, String> entryBackup = dhtBackup.peekExx(key);
+
+        assert entryPrimary != null;
+        assert entryBackup != null;
+        assert nearOther.peekExx(key) == null;
+        assert dhtOther.peekExx(key) == null;
+
+        IgniteFuture<IgniteEvent> futOther =
+            waitForLocalEvent(grid(other).events(), nodeEvent(other.id()), EVT_CACHE_ENTRY_EVICTED);
+
+        IgniteFuture<IgniteEvent> futBackup =
+            waitForLocalEvent(grid(backup).events(), nodeEvent(backup.id()), EVT_CACHE_ENTRY_EVICTED);
+
+        IgniteFuture<IgniteEvent> futPrimary =
+            waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED);
+
+        // Get value on other node, it should be loaded to near cache.
+        assertEquals(val, nearOther.get(key, true, null));
+
+        entryPrimary = dhtPrimary.peekExx(key);
+        entryBackup = dhtBackup.peekExx(key);
+
+        assert entryPrimary != null;
+        assert entryBackup != null;
+
+        assertEquals(val, nearOther.peek(key));
+
+        assertTrue(!entryPrimary.readers().isEmpty());
+
+        // Evict on primary node.
+        // It will trigger dht eviction and eviction on backup node.
+        grid(primary).cache(null).evict(key);
+
+        futOther.get(3000);
+        futBackup.get(3000);
+        futPrimary.get(3000);
+
+        assertNull(dhtPrimary.peek(key));
+        assertNull(nearPrimary.peek(key));
+
+        assertNull(dhtBackup.peek(key));
+        assertNull(nearBackup.peek(key));
+
+        assertNull(dhtOther.peek(key));
+        assertNull(nearOther.peek(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java
new file mode 100644
index 0000000..6c311d4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Tests for dht cache eviction.
+ */
+public class GridCacheDhtEvictionSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Default constructor. */
+    public GridCacheDhtEvictionSelfTest() {
+        super(false /* don't start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setPreloadMode(NONE);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(false);
+        cacheCfg.setEvictSynchronized(true);
+        cacheCfg.setEvictNearSynchronized(true);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+        cacheCfg.setBackups(1);
+
+        // Set eviction queue size explicitly.
+        cacheCfg.setEvictMaxOverflowRatio(0);
+        cacheCfg.setEvictSynchronizedKeyBufferSize(1);
+        cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000));
+        cacheCfg.setNearEvictionPolicy(new GridCacheFifoEvictionPolicy(10000));
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions"})
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        if (GRID_CNT < 2)
+            throw new IgniteCheckedException("GRID_CNT must not be less than 2.");
+
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"SizeReplaceableByIsEmpty"})
+    @Override protected void beforeTest() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            assert near(grid(i)).size() == 0;
+            assert dht(grid(i)).size() == 0;
+
+            assert near(grid(i)).isEmpty();
+            assert dht(grid(i)).isEmpty();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override protected void afterTest() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            near(grid(i)).removeAll(new IgnitePredicate[] {F.alwaysTrue()});
+
+            assert near(grid(i)).isEmpty() : "Near cache is not empty [idx=" + i + "]";
+            assert dht(grid(i)).isEmpty() : "Dht cache is not empty [idx=" + i + "]";
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @return Grid for the given node.
+     */
+    private Ignite grid(ClusterNode node) {
+        return G.ignite(node.id());
+    }
+
+    /**
+     * @param g Grid.
+     * @return Near cache.
+     */
+    @SuppressWarnings({"unchecked"})
+    private GridNearCacheAdapter<Integer, String> near(Ignite g) {
+        return (GridNearCacheAdapter)((GridKernal)g).internalCache();
+    }
+
+    /**
+     * @param g Grid.
+     * @return Dht cache.
+     */
+    @SuppressWarnings({"unchecked", "TypeMayBeWeakened"})
+    private GridDhtCacheAdapter<Integer, String> dht(Ignite g) {
+        return ((GridNearCacheAdapter)((GridKernal)g).internalCache()).dht();
+    }
+
+    /**
+     * @param idx Index.
+     * @return Affinity.
+     */
+    private GridCacheConsistentHashAffinityFunction affinity(int idx) {
+        return (GridCacheConsistentHashAffinityFunction)grid(idx).cache(null).configuration().getAffinity();
+    }
+
+    /**
+     * @param key Key.
+     * @return Primary node for the given key.
+     */
+    private Collection<ClusterNode> keyNodes(Object key) {
+        GridCacheConsistentHashAffinityFunction aff = affinity(0);
+
+        return aff.nodes(aff.partition(key), grid(0).nodes(), 1);
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @return Predicate for events belonging to specified node.
+     */
+    private IgnitePredicate<IgniteEvent> nodeEvent(final UUID nodeId) {
+        assert nodeId != null;
+
+        return new P1<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent e) {
+                info("Predicate called [e.nodeId()=" + e.node().id() + ", nodeId=" + nodeId + ']');
+
+                return e.node().id().equals(nodeId);
+            }
+        };
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("NullArgumentToVariableArgMethod")
+    public void testSingleKey() throws Exception {
+        Integer key = 1;
+
+        Collection<ClusterNode> nodes = new ArrayList<>(keyNodes(key));
+
+        ClusterNode primary = F.first(nodes);
+
+        assert primary != null;
+
+        nodes.remove(primary);
+
+        ClusterNode backup = F.first(nodes);
+
+        assert backup != null;
+
+        assert !F.eqNodes(primary, backup);
+
+        info("Key primary node: " + primary.id());
+        info("Key backup node: " + backup.id());
+
+        GridNearCacheAdapter<Integer, String> nearPrimary = near(grid(primary));
+        GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(grid(primary));
+
+        GridNearCacheAdapter<Integer, String> nearBackup = near(grid(backup));
+        GridDhtCacheAdapter<Integer, String> dhtBackup = dht(grid(backup));
+
+        String val = "v1";
+
+        // Put on primary node.
+        nearPrimary.put(key, val, null);
+
+        assertEquals(val, nearPrimary.peek(key));
+        assertEquals(val, dhtPrimary.peek(key));
+
+        assertEquals(val, nearBackup.peek(key));
+        assertEquals(val, dhtBackup.peek(key));
+
+        GridDhtCacheEntry<Integer, String> entryPrimary = dhtPrimary.peekExx(key);
+        GridDhtCacheEntry<Integer, String> entryBackup = dhtBackup.peekExx(key);
+
+        assert entryPrimary != null;
+        assert entryBackup != null;
+
+        assertTrue(entryPrimary.readers().isEmpty());
+        assertTrue(entryBackup.readers().isEmpty());
+
+        IgniteFuture<IgniteEvent> futBackup =
+            waitForLocalEvent(grid(backup).events(), nodeEvent(backup.id()), EVT_CACHE_ENTRY_EVICTED);
+
+        IgniteFuture<IgniteEvent> futPrimary =
+            waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED);
+
+        // Evict on primary node.
+        // It should trigger dht eviction and eviction on backup node.
+        assert grid(primary).cache(null).evict(key);
+
+        // Give 5 seconds for eviction event to occur on backup and primary node.
+        futBackup.get(3000);
+        futPrimary.get(3000);
+
+        assertEquals(0, nearPrimary.size());
+
+        assertNull(nearPrimary.peekExx(key));
+        assertNull(dhtPrimary.peekExx(key));
+
+        assertNull(nearBackup.peekExx(key));
+        assertNull(dhtBackup.peekExx(key));
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("NullArgumentToVariableArgMethod")
+    public void testMultipleKeys() throws Exception {
+        final int keyCnt = 1000;
+
+        final Ignite primaryIgnite = grid(0);
+        final Ignite backupIgnite = grid(1);
+
+        GridNearCacheAdapter<Integer, String> nearPrimary = near(primaryIgnite);
+        GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(primaryIgnite);
+
+        GridNearCacheAdapter<Integer, String> nearBackup = near(backupIgnite);
+        GridDhtCacheAdapter<Integer, String> dhtBackup = dht(backupIgnite);
+
+        Collection<Integer> keys = new ArrayList<>(keyCnt);
+
+        for (int key = 0; keys.size() < keyCnt; key++)
+            if (F.eqNodes(primaryIgnite.cluster().localNode(), F.first(keyNodes(key))))
+                keys.add(key++);
+
+        info("Test keys: " + keys);
+
+        // Put on primary node.
+        for (Integer key : keys)
+            nearPrimary.put(key, "v" + key, null);
+
+        for (Integer key : keys) {
+            String val = "v" + key;
+
+            assertEquals(val, nearPrimary.peek(key));
+            assertEquals(val, dhtPrimary.peek(key));
+
+            assertEquals(val, nearBackup.peek(key));
+            assertEquals(val, dhtBackup.peek(key));
+        }
+
+        final AtomicInteger cntBackup = new AtomicInteger();
+
+        IgniteFuture<IgniteEvent> futBackup = waitForLocalEvent(backupIgnite.events(), new P1<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent e) {
+                return e.node().id().equals(backupIgnite.cluster().localNode().id()) &&
+                    cntBackup.incrementAndGet() == keyCnt;
+            }
+        }, EVT_CACHE_ENTRY_EVICTED);
+
+        final AtomicInteger cntPrimary = new AtomicInteger();
+
+        IgniteFuture<IgniteEvent> futPrimary = waitForLocalEvent(primaryIgnite.events(), new P1<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent e) {
+                return e.node().id().equals(primaryIgnite.cluster().localNode().id()) &&
+                    cntPrimary.incrementAndGet() == keyCnt;
+            }
+        }, EVT_CACHE_ENTRY_EVICTED);
+
+        // Evict on primary node.
+        // Eviction of the last key should trigger queue processing.
+        for (Integer key : keys) {
+            boolean evicted = primaryIgnite.cache(null).evict(key);
+
+            assert evicted;
+        }
+
+        // Give 5 seconds for eviction events to occur on backup and primary node.
+        futBackup.get(3000);
+        futPrimary.get(3000);
+
+        info("nearBackupSize: " + nearBackup.size());
+        info("dhtBackupSize: " + dhtBackup.size());
+        info("nearPrimarySize: " + nearPrimary.size());
+        info("dhtPrimarySize: " + dhtPrimary.size());
+
+        // Check backup node first.
+        for (Integer key : keys) {
+            String msg = "Failed key: " + key;
+
+            assertNull(msg, nearBackup.peek(key));
+            assertNull(msg, dhtBackup.peek(key));
+            assertNull(msg, nearBackup.peekExx(key));
+            assertNull(msg, dhtBackup.peekExx(key));
+        }
+
+        for (Integer key : keys) {
+            String msg = "Failed key: " + key;
+
+            assertNull(msg, nearPrimary.peek(key));
+            assertNull(msg, dhtPrimary.peek(key));
+            assertNull(msg, nearPrimary.peekExx(key));
+            assertNull(dhtPrimary.peekExx(key));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java
new file mode 100644
index 0000000..2aeccf8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Test cache closure execution.
+ */
+public class GridCacheDhtEvictionsDisabledSelfTest extends GridCommonAbstractTest {
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public GridCacheDhtEvictionsDisabledSelfTest() {
+        super(false); // Don't start grid node.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(spi);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setName("test");
+        cc.setCacheMode(GridCacheMode.PARTITIONED);
+        cc.setDefaultTimeToLive(0);
+        cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(PARTITIONED_ONLY);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** @throws Exception If failed. */
+    public void testOneNode() throws Exception {
+        checkNodes(startGridsMultiThreaded(1));
+
+        assertEquals(26, colocated(0, "test").size());
+        assertEquals(26, cache(0, "test").size());
+    }
+
+    /** @throws Exception If failed. */
+    public void testTwoNodes() throws Exception {
+        checkNodes(startGridsMultiThreaded(2));
+
+        assertTrue(colocated(0, "test").size() > 0);
+        assertTrue(cache(0, "test").size() > 0);
+    }
+
+    /** @throws Exception If failed. */
+    public void testThreeNodes() throws Exception {
+        checkNodes(startGridsMultiThreaded(3));
+
+        assertTrue(colocated(0, "test").size() > 0);
+        assertTrue(cache(0, "test").size() > 0);
+    }
+
+    /**
+     * @param g Grid.
+     * @throws Exception If failed.
+     */
+    private void checkNodes(Ignite g) throws Exception {
+        GridCache<String, String> cache = g.cache("test");
+
+        for (char c = 'a'; c <= 'z'; c++) {
+            String key = Character.toString(c);
+
+            cache.put(key, "val-" + key);
+
+            String v1 = cache.get(key);
+            String v2 = cache.get(key); // Get second time.
+
+            info("v1: " + v1);
+            info("v2: " + v2);
+
+            assertNotNull(v1);
+            assertNotNull(v2);
+
+            if (cache.affinity().mapKeyToNode(key).isLocal())
+                assertSame(v1, v2);
+            else
+                assertEquals(v1, v2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java
new file mode 100644
index 0000000..e1aabd2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests preloading of expired entries.
+ */
+public class GridCacheDhtExpiredEntriesPreloadSelfTest extends GridCacheExpiredEntriesPreloadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java
new file mode 100644
index 0000000..b20d048
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.datastructures.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.datastructures.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePeekMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Tests for internal DHT entry.
+ */
+public class GridCacheDhtInternalEntrySelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Grid count. */
+    private static final int GRID_CNT = 2;
+
+    /** Atomic long name. */
+    private static final String ATOMIC_LONG_NAME = "test.atomic.long";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 2));
+        cacheCfg.setBackups(0);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setDistributionMode(GridCacheDistributionMode.NEAR_PARTITIONED);
+        cacheCfg.setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy());
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** @throws Exception If failed. */
+    public void testInternalKeyReaders() throws Exception {
+        IgniteBiTuple<ClusterNode, ClusterNode> nodes = getNodes(ATOMIC_LONG_NAME);
+
+        ClusterNode primary = nodes.get1();
+        ClusterNode other = nodes.get2();
+
+        // Create on non-primary node.
+        GridCacheAtomicLong l = grid(other).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true);
+
+        assert l != null;
+        assert l.get() == 1;
+
+        check(primary, other, true);
+
+        // Update on primary.
+        l = grid(primary).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true);
+
+        assert l != null;
+        assert l.get() == 1;
+
+        l.incrementAndGet();
+
+        assert l.get() == 2;
+
+        // Check on non-primary.
+        l = grid(other).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true);
+
+        assert l != null;
+        assert l.get() == 2;
+
+        check(primary, other, true);
+
+        // Remove.
+        assert grid(other).cache(null).dataStructures().removeAtomicLong(ATOMIC_LONG_NAME);
+
+        check(primary, other, false);
+    }
+
+    /**
+     * @param primary Primary node.
+     * @param other Non-primary node.
+     * @param exists Whether entry is expected to exist.
+     * @throws Exception In case of error.
+     */
+    private void check(ClusterNode primary, ClusterNode other, boolean exists) throws Exception {
+        if (exists) {
+            // Check primary node has entry in DHT cache.
+            assert peekNear(primary) == null;
+            assert peekDht(primary) != null;
+
+            // Check non-primary node has entry in near cache.
+            assert peekNear(other) != null;
+            assert peekDht(other) == null;
+
+            // Check primary node has reader for non-primary node.
+            assert peekDhtEntry(primary).readers().contains(other.id());
+        }
+        else {
+            assert peekGlobal(primary) == null;
+            assert peekGlobal(other) == null;
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @return Atomic long value.
+     */
+    private GridCacheAtomicLongValue peekGlobal(ClusterNode node) {
+        return (GridCacheAtomicLongValue)grid(node).cache(null).peek(
+            new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME));
+    }
+
+    /**
+     * @param node Node.
+     * @return Atomic long value.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private GridCacheAtomicLongValue peekNear(ClusterNode node) throws IgniteCheckedException {
+        return (GridCacheAtomicLongValue)grid(node).cache(null).peek(
+            new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME), Collections.singleton(NEAR_ONLY));
+    }
+
+    /**
+     * @param node Node.
+     * @return Atomic long value.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private GridCacheAtomicLongValue peekDht(ClusterNode node) throws IgniteCheckedException {
+        return (GridCacheAtomicLongValue)grid(node).cache(null).peek(
+            new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME), Collections.singleton(PARTITIONED_ONLY));
+    }
+
+    /**
+     * @param node Node.
+     * @return DHT entry.
+     */
+    private GridDhtCacheEntry<Object, Object> peekDhtEntry(ClusterNode node) {
+        return (GridDhtCacheEntry<Object, Object>)dht(grid(node).cache(null)).peekEx(
+            new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME));
+    }
+
+    /**
+     * @param key Key.
+     * @return Pair {primary node, some other node}.
+     */
+    private IgniteBiTuple<ClusterNode, ClusterNode> getNodes(String key) {
+        GridCacheAffinity<Object> aff = grid(0).cache(null).affinity();
+
+        ClusterNode primary = aff.mapKeyToNode(key);
+
+        assert primary != null;
+
+        Collection<ClusterNode> nodes = new ArrayList<>(grid(0).nodes());
+
+        nodes.remove(primary);
+
+        ClusterNode other = F.first(nodes);
+
+        assert other != null;
+
+        assert !F.eqNodes(primary, other);
+
+        return F.t(primary, other);
+    }
+
+    /**
+     * @param node Node.
+     * @return Grid.
+     */
+    private Ignite grid(ClusterNode node) {
+        return G.ignite(node.id());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java
new file mode 100644
index 0000000..e50d4f3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Tests dht mapping.
+ */
+public class GridCacheDhtMappingSelfTest extends GridCommonAbstractTest {
+    /** Number of key backups. */
+    private static final int BACKUPS = 1;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setBackups(BACKUPS);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** @throws Exception If failed. */
+    public void testMapping() throws Exception {
+        int nodeCnt = 5;
+
+        startGridsMultiThreaded(nodeCnt);
+
+        GridCache<Integer, Integer> cache = grid(nodeCnt - 1).cache(null);
+
+        int kv = 1;
+
+        cache.put(kv, kv);
+
+        int cnt = 0;
+
+        for (int i = 0; i < nodeCnt; i++) {
+            Ignite g = grid(i);
+
+            GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)
+                ((GridKernal)g).<Integer, Integer>internalCache()).dht();
+
+            if (dht.peek(kv) != null) {
+                info("Key found on node: " + g.cluster().localNode().id());
+
+                cnt++;
+            }
+        }
+
+        // Test key should be on primary and backup node only.
+        assertEquals(1 + BACKUPS, cnt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java
new file mode 100644
index 0000000..13b7cb0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public class GridCacheDhtMultiBackupTest extends GridCommonAbstractTest {
+    /**
+     *
+     */
+    public GridCacheDhtMultiBackupTest() {
+        super(false /* don't start grid. */);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testPut() throws Exception {
+        try {
+            Ignite g = G.start("examples/config/example-cache.xml");
+
+            if (g.cluster().nodes().size() < 5)
+                U.warn(log, "Topology is too small for this test. " +
+                    "Run with 4 remote nodes or more having large number of backup nodes.");
+
+            g.compute().run(new CAX() {
+                    @IgniteInstanceResource
+                    private Ignite g;
+
+                    @Override public void applyx() throws IgniteCheckedException {
+                        X.println("Checking whether cache is empty.");
+
+                        GridCache<SampleKey, SampleValue> cache = g.cache("partitioned");
+
+                        assert cache.isEmpty();
+                    }
+                }
+            );
+
+            GridCache<SampleKey, SampleValue> cache = g.cache("partitioned");
+
+            int cnt = 0;
+
+            for (int key = 0; key < 1000; key++) {
+                SampleKey key1 = new SampleKey(key);
+
+                if (!g.cluster().localNode().id().equals(g.cluster().mapKeyToNode("partitioned", key1).id())) {
+                    cache.put(key1, new SampleValue(key));
+
+                    cnt++;
+                }
+            }
+
+            X.println(">>> Put count: " + cnt);
+        }
+        finally {
+            G.stopAll(false);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SampleKey implements Serializable {
+        /** */
+        private int key;
+
+        /**
+         * @param key
+         */
+        private SampleKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SampleKey && ((SampleKey)obj).key == key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SampleValue implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         * @param val
+         */
+        private SampleValue(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SampleValue && ((SampleValue)obj).val == val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java
new file mode 100644
index 0000000..9a7d262
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.cache.CacheConfiguration.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test large cache counts.
+ */
+public class GridCacheDhtPreloadBigDataSelfTest extends GridCommonAbstractTest {
+    /** Size of values in KB. */
+    private static final int KBSIZE = 10 * 1024;
+
+    /** Default backups. */
+    private static final int DFLT_BACKUPS = 1;
+
+    /** Partitions. */
+    private static final int DFLT_PARTITIONS = 521;
+
+    /** Preload batch size. */
+    private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups = DFLT_BACKUPS;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = ASYNC;
+
+    /** */
+    private int preloadBatchSize = DFLT_BATCH_SIZE;
+
+    /** Number of partitions. */
+    private int partitions = DFLT_PARTITIONS;
+
+    /** */
+    private LifecycleBean lbean;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public GridCacheDhtPreloadBigDataSelfTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setPreloadBatchSize(preloadBatchSize);
+        cc.setWriteSynchronizationMode(FULL_SYNC);
+        cc.setPreloadMode(preloadMode);
+        cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions));
+        cc.setBackups(backups);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        if (lbean != null)
+            c.setLifecycleBeans(lbean);
+
+        c.setDiscoverySpi(disco);
+        c.setCacheConfiguration(cc);
+        c.setDeploymentMode(CONTINUOUS);
+        c.setNetworkTimeout(1000);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = DFLT_BACKUPS;
+        partitions = DFLT_PARTITIONS;
+        preloadMode = ASYNC;
+        preloadBatchSize = DFLT_BATCH_SIZE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        // Clean up memory for test suite.
+        lbean = null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeObjects() throws Exception {
+        preloadMode = SYNC;
+
+        try {
+            startGrid(0);
+
+            int cnt = 10000;
+
+            populate(grid(0).<Integer, byte[]>cache(null), cnt, KBSIZE);
+
+            int gridCnt = 3;
+
+            for (int i = 1; i < gridCnt; i++)
+                startGrid(i);
+
+            Thread.sleep(10000);
+
+            for (int i = 0; i < gridCnt; i++) {
+                GridCache<Integer, String> c = grid(i).cache(null);
+
+                if (backups + 1 <= gridCnt)
+                    assert c.size() < cnt : "Cache size: " + c.size();
+                else
+                    assert c.size() == cnt;
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeObjectsWithLifeCycleBean() throws Exception {
+        preloadMode = SYNC;
+        partitions = 23;
+
+        try {
+            final int cnt = 10000;
+
+            lbean = new LifecycleBean() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException {
+                    if (evt == LifecycleEventType.AFTER_GRID_START) {
+                        GridCache<Integer, byte[]> c = ignite.cache(null);
+
+                        if (c.putxIfAbsent(-1, new byte[1])) {
+                            populate(c, cnt, KBSIZE);
+
+                            info(">>> POPULATED GRID <<<");
+                        }
+                    }
+                }
+            };
+
+            int gridCnt = 3;
+
+            for (int i = 0; i < gridCnt; i++)
+                startGrid(i);
+
+            for (int i = 0; i < gridCnt; i++)
+                info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']');
+
+            Thread.sleep(10000);
+
+            for (int i = 0; i < gridCnt; i++) {
+                GridCache<Integer, String> c = grid(i).cache(null);
+
+                if (backups + 1 <= gridCnt)
+                    assert c.size() < cnt;
+                else
+                    assert c.size() == cnt;
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @param kbSize Size in KB.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void populate(GridCache<Integer, byte[]> c, int cnt, int kbSize) throws IgniteCheckedException {
+        for (int i = 0; i < cnt; i++)
+            c.put(i, value(kbSize));
+    }
+
+    /**
+     * @param size Size.
+     * @return Value.
+     */
+    private byte[] value(int size) {
+        return new byte[size];
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 6 * 60 * 1000; // 6 min.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
new file mode 100644
index 0000000..0965c87
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ *
+ * Forum example <a href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449">
+ * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a>
+ */
+public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
+    /** Key count. */
+    private static final int KEY_CNT = 100;
+
+    /** Preload delay. */
+    private static final int PRELOAD_DELAY = 5000;
+
+    /** Preload mode. */
+    private GridCachePreloadMode preloadMode = ASYNC;
+
+    /** Preload delay. */
+    private long delay = -1;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        assert preloadMode != null;
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setPreloadMode(preloadMode);
+        cc.setPreloadPartitionedDelay(delay);
+        cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 128));
+        cc.setBackups(1);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(NEAR_PARTITIONED);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+        disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+        c.setDiscoverySpi(disco);
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** @throws Exception If failed. */
+    public void testManualPreload() throws Exception {
+        delay = -1;
+
+        Ignite g0 = startGrid(0);
+
+        int cnt = KEY_CNT;
+
+        GridCache<String, Integer> c0 = g0.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            c0.put(Integer.toString(i), i);
+
+        Ignite g1 = startGrid(1);
+        Ignite g2 = startGrid(2);
+
+        GridCache<String, Integer> c1 = g1.cache(null);
+        GridCache<String, Integer> c2 = g2.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            assertNull(c1.peek(Integer.toString(i)));
+
+        for (int i = 0; i < cnt; i++)
+            assertNull(c2.peek(Integer.toString(i)));
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        g1.events().localListen(new IgnitePredicate<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent evt) {
+                l1.countDown();
+
+                return true;
+            }
+        }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        g2.events().localListen(new IgnitePredicate<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent evt) {
+                l2.countDown();
+
+                return true;
+            }
+        }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        info("Beginning to wait for cache1 repartition.");
+
+        GridDhtCacheAdapter<String, Integer> d0 = dht(0);
+        GridDhtCacheAdapter<String, Integer> d1 = dht(1);
+        GridDhtCacheAdapter<String, Integer> d2 = dht(2);
+
+        checkMaps(false, d0, d1, d2);
+
+        // Force preload.
+        c1.forceRepartition();
+
+        l1.await();
+
+        info("Cache1 is repartitioned.");
+
+        checkMaps(false, d0, d1, d2);
+
+        info("Beginning to wait for cache2 repartition.");
+
+        // Force preload.
+        c2.forceRepartition();
+
+        l2.await();
+
+        info("Cache2 is repartitioned.");
+
+        checkMaps(true, d0, d1, d2);
+
+        checkCache(c0, cnt);
+        checkCache(c1, cnt);
+        checkCache(c2, cnt);
+    }
+
+    /** @throws Exception If failed. */
+    public void testDelayedPreload() throws Exception {
+        delay = PRELOAD_DELAY;
+
+        Ignite g0 = startGrid(0);
+
+        int cnt = KEY_CNT;
+
+        GridCache<String, Integer> c0 = g0.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            c0.put(Integer.toString(i), i);
+
+        Ignite g1 = startGrid(1);
+        Ignite g2 = startGrid(2);
+
+        GridCache<String, Integer> c1 = g1.cache(null);
+        GridCache<String, Integer> c2 = g2.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            assertNull(c1.peek(Integer.toString(i)));
+
+        for (int i = 0; i < cnt; i++)
+            assertNull(c2.peek(Integer.toString(i)));
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        g1.events().localListen(new IgnitePredicate<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent evt) {
+                l1.countDown();
+
+                return true;
+            }
+        }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        g2.events().localListen(new IgnitePredicate<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent evt) {
+                l2.countDown();
+
+                return true;
+            }
+        }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        U.sleep(1000);
+
+        GridDhtCacheAdapter<String, Integer> d0 = dht(0);
+        GridDhtCacheAdapter<String, Integer> d1 = dht(1);
+        GridDhtCacheAdapter<String, Integer> d2 = dht(2);
+
+        info("Beginning to wait for caches repartition.");
+
+        checkMaps(false, d0, d1, d2);
+
+        assert l1.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+
+        assert l2.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+
+        U.sleep(1000);
+
+        info("Caches are repartitioned.");
+
+        checkMaps(true, d0, d1, d2);
+
+        checkCache(c0, cnt);
+        checkCache(c1, cnt);
+        checkCache(c2, cnt);
+    }
+
+    /** @throws Exception If failed. */
+    public void testAutomaticPreload() throws Exception {
+        delay = 0;
+        preloadMode = GridCachePreloadMode.SYNC;
+
+        Ignite g0 = startGrid(0);
+
+        int cnt = KEY_CNT;
+
+        GridCache<String, Integer> c0 = g0.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            c0.put(Integer.toString(i), i);
+
+        Ignite g1 = startGrid(1);
+        Ignite g2 = startGrid(2);
+
+        GridCache<String, Integer> c1 = g1.cache(null);
+        GridCache<String, Integer> c2 = g2.cache(null);
+
+        GridDhtCacheAdapter<String, Integer> d0 = dht(0);
+        GridDhtCacheAdapter<String, Integer> d1 = dht(1);
+        GridDhtCacheAdapter<String, Integer> d2 = dht(2);
+
+        checkMaps(true, d0, d1, d2);
+
+        checkCache(c0, cnt);
+        checkCache(c1, cnt);
+        checkCache(c2, cnt);
+    }
+
+    /** @throws Exception If failed. */
+    public void testAutomaticPreloadWithEmptyCache() throws Exception {
+        preloadMode = SYNC;
+
+        delay = 0;
+
+        Collection<Ignite> ignites = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < 5; i++) {
+                ignites.add(startGrid(i));
+
+                awaitPartitionMapExchange();
+
+                for (Ignite g : ignites) {
+                    info(">>> Checking affinity for grid: " + g.name());
+
+                    GridDhtPartitionTopology<Integer, String> top = topology(g);
+
+                    GridDhtPartitionFullMap fullMap = top.partitionMap(true);
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> fe : fullMap.entrySet()) {
+                        UUID nodeId = fe.getKey();
+
+                        GridDhtPartitionMap m = fe.getValue();
+
+                        for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
+                            int p = e.getKey();
+                            GridDhtPartitionState state = e.getValue();
+
+                            Collection<ClusterNode> nodes = affinityNodes(g, p);
+
+                            Collection<UUID> nodeIds = U.nodeIds(nodes);
+
+                            assert nodeIds.contains(nodeId) : "Invalid affinity mapping [nodeId=" + nodeId +
+                                ", part=" + p + ", state=" + state + ", grid=" + G.ignite(nodeId).name() +
+                                ", affNames=" + U.nodes2names(nodes) + ", affIds=" + nodeIds + ']';
+                        }
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testManualPreloadSyncMode() throws Exception {
+        preloadMode = GridCachePreloadMode.SYNC;
+        delay = -1;
+
+        try {
+            startGrid(0);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testPreloadManyNodes() throws Exception {
+        delay = 0;
+        preloadMode = ASYNC;
+
+        startGridsMultiThreaded(9);
+
+        U.sleep(2000);
+
+        try {
+            delay = -1;
+            preloadMode = ASYNC;
+
+            Ignite g = startGrid(9);
+
+            info(">>> Starting manual preload");
+
+            long start = System.currentTimeMillis();
+
+            g.cache(null).forceRepartition().get();
+
+            info(">>> Finished preloading of empty cache in " + (System.currentTimeMillis() - start) + "ms.");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param g Grid.
+     * @return Topology.
+     */
+    private GridDhtPartitionTopology<Integer, String> topology(Ignite g) {
+        return ((GridNearCacheAdapter<Integer, String>)((GridKernal)g).<Integer, String>internalCache()).dht().topology();
+    }
+
+    /**
+     * @param g Grid.
+     * @return Affinity.
+     */
+    private GridCacheAffinity<Object> affinity(Ignite g) {
+        return g.cache(null).affinity();
+    }
+
+    /**
+     * @param g Grid.
+     * @param p Partition.
+     * @return Affinity nodes.
+     */
+    private Collection<ClusterNode> affinityNodes(Ignite g, int p) {
+        return affinity(g).mapPartitionToPrimaryAndBackups(p);
+    }
+
+    /**
+     * Checks if keys are present.
+     *
+     * @param c Cache.
+     * @param keyCnt Key count.
+     */
+    private void checkCache(GridCache<String, Integer> c, int keyCnt) {
+        Ignite g = c.gridProjection().ignite();
+
+        for (int i = 0; i < keyCnt; i++) {
+            String key = Integer.toString(i);
+
+            if (c.affinity().isPrimaryOrBackup(g.cluster().localNode(), key))
+                assertEquals(Integer.valueOf(i), c.peek(key));
+        }
+    }
+
+    /**
+     * Checks maps for equality.
+     *
+     * @param strict Strict check flag.
+     * @param caches Maps to compare.
+     */
+    private void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches)
+        throws IgniteInterruptedException {
+        if (caches.length < 2)
+            return;
+
+        GridTestUtils.retryAssert(log, 50, 500, new CAX() {
+            @Override public void applyx() {
+                info("Checking partition maps.");
+
+                for (int i = 0; i < caches.length; i++)
+                    info("Partition map for node " + i + ": " + caches[i].topology().partitionMap(false).toFullString());
+
+                GridDhtPartitionFullMap orig = caches[0].topology().partitionMap(true);
+
+                for (int i = 1; i < caches.length; i++) {
+                    GridDhtPartitionFullMap cmp = caches[i].topology().partitionMap(true);
+
+                    assert orig.keySet().equals(cmp.keySet());
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> entry : orig.entrySet()) {
+                        UUID nodeId = entry.getKey();
+
+                        GridDhtPartitionMap nodeMap = entry.getValue();
+
+                        GridDhtPartitionMap cmpMap = cmp.get(nodeId);
+
+                        assert cmpMap != null;
+
+                        assert nodeMap.keySet().equals(cmpMap.keySet());
+
+                        for (Map.Entry<Integer, GridDhtPartitionState> nodeEntry : nodeMap.entrySet()) {
+                            GridDhtPartitionState state = cmpMap.get(nodeEntry.getKey());
+
+                            assert state != null;
+                            assert state != GridDhtPartitionState.EVICTED;
+                            assert !strict || state == GridDhtPartitionState.OWNING : "Invalid partition state: " + state;
+                            assert state == nodeEntry.getValue();
+                        }
+                    }
+                }
+            }
+        });
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java
new file mode 100644
index 0000000..5392dee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ */
+public class GridCacheDhtPreloadDisabledSelfTest extends GridCommonAbstractTest {
+    /** Flat to print preloading events. */
+    private static final boolean DEBUG = false;
+
+    /** */
+    private static final long TEST_TIMEOUT = 5 * 60 * 1000;
+
+    /** Default backups. */
+    private static final int DFLT_BACKUPS = 1;
+
+    /** Partitions. */
+    private static final int DFLT_PARTITIONS = 521;
+
+    /** Number of key backups. Each test method can set this value as required. */
+    private int backups = DFLT_BACKUPS;
+
+    /** Number of partitions. */
+    private int partitions = DFLT_PARTITIONS;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public GridCacheDhtPreloadDisabledSelfTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_ASYNC);
+        cacheCfg.setPreloadMode(NONE);
+        cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions));
+        cacheCfg.setBackups(backups);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+        //cacheCfg.setPreloadThreadPoolSize(1);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setCacheConfiguration(cacheCfg);
+        cfg.setDeploymentMode(CONTINUOUS);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = DFLT_BACKUPS;
+        partitions = DFLT_PARTITIONS;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /**
+     * @param i Grid index.
+     * @return Topology.
+     */
+    private GridDhtPartitionTopology<Integer, String> topology(int i) {
+        return near(grid(i).<Integer, String>cache(null)).dht().topology();
+    }
+
+    /** @throws Exception If failed. */
+    public void testSamePartitionMap() throws Exception {
+        backups = 1;
+        partitions = 10;
+
+        int nodeCnt = 4;
+
+        startGridsMultiThreaded(nodeCnt);
+
+        try {
+            for (int p = 0; p < partitions; p++) {
+                List<Collection<ClusterNode>> mappings = new ArrayList<>(nodeCnt);
+
+                for (int i = 0; i < nodeCnt; i++) {
+                    Collection<ClusterNode> nodes = topology(i).nodes(p, -1);
+                    List<ClusterNode> owners = topology(i).owners(p);
+
+                    int size = backups + 1;
+
+                    assert owners.size() == size : "Size mismatch [nodeIdx=" + i + ", p=" + p + ", size=" + size +
+                        ", owners=" + F.nodeIds(owners) + ']';
+                    assert nodes.size() == size : "Size mismatch [nodeIdx=" + i + ", p=" + p + ", size=" + size +
+                        ", nodes=" + F.nodeIds(nodes) + ']';
+
+                    assert F.eqNotOrdered(nodes, owners);
+                    assert F.eqNotOrdered(owners, nodes);
+
+                    mappings.add(owners);
+                }
+
+                for (int i = 0; i < mappings.size(); i++) {
+                    Collection<ClusterNode> m1 = mappings.get(i);
+
+                    for (int j = 0; j != i && j < mappings.size(); j++) {
+                        Collection<ClusterNode> m2 = mappings.get(j);
+
+                        assert F.eqNotOrdered(m1, m2) : "Mappings are not equal [m1=" + F.nodeIds(m1) + ", m2=" +
+                            F.nodeIds(m2) + ']';
+                        assert F.eqNotOrdered(m2, m1) : "Mappings are not equal [m1=" + F.nodeIds(m1) + ", m2=" +
+                            F.nodeIds(m2) + ']';
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testDisabledPreloader() throws Exception {
+        try {
+            Ignite ignite1 = startGrid(0);
+
+            GridCache<Integer, String> cache1 = ignite1.cache(null);
+
+            int keyCnt = 10;
+
+            putKeys(cache1, keyCnt);
+
+            for (int i = 0; i < keyCnt; i++) {
+                assertNull(near(cache1).peekEx(i));
+                assertNotNull((dht(cache1).peekEx(i)));
+
+                assertEquals(Integer.toString(i), cache1.peek(i));
+            }
+
+            int nodeCnt = 3;
+
+            List<Ignite> ignites = new ArrayList<>(nodeCnt);
+
+            startGrids(nodeCnt, 1, ignites);
+
+            // Check all nodes.
+            for (Ignite g : ignites) {
+                GridCache<Integer, String> c = g.cache(null);
+
+                for (int i = 0; i < keyCnt; i++)
+                    assertNull(c.peek(i));
+            }
+
+            Collection<Integer> keys = new LinkedList<>();
+
+            for (int i = 0; i < keyCnt; i++)
+                if (cache1.affinity().mapKeyToNode(i).equals(ignite1.cluster().localNode()))
+                    keys.add(i);
+
+            info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" +
+                U.grids2names(ignites) + ']');
+
+            for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) {
+                Ignite g = it.next();
+
+                it.remove();
+
+                stopGrid(g.name());
+
+                // Check all nodes.
+                for (Ignite gg : ignites) {
+                    GridCache<Integer, String> c = gg.cache(null);
+
+                    for (int i = 0; i < keyCnt; i++)
+                        assertNull(c.peek(i));
+                }
+            }
+
+            for (Integer i : keys)
+                assertEquals(i.toString(), cache1.peek(i));
+        }
+        catch (Error | Exception e) {
+            error("Test failed.", e);
+
+            throw e;
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cnt Number of grids.
+     * @param startIdx Start node index.
+     * @param list List of started grids.
+     * @throws Exception If failed.
+     */
+    private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception {
+        for (int i = 0; i < cnt; i++) {
+            final Ignite g = startGrid(startIdx++);
+
+            if (DEBUG)
+                g.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent evt) {
+                        info("\n>>> Preload event [grid=" + g.name() + ", evt=" + evt + ']');
+
+                        return true;
+                    }
+                }, EVTS_CACHE_PRELOAD);
+
+            list.add(g);
+        }
+    }
+
+    /** @param grids Grids to stop. */
+    private void stopGrids(Iterable<Ignite> grids) {
+        for (Ignite g : grids)
+            stopGrid(g.name());
+    }
+
+    /**
+     * @param c Cache.
+     * @param cnt Key count.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException {
+        for (int i = 0; i < cnt; i++)
+            c.put(i, Integer.toString(i));
+    }
+}


Mime
View raw message