ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [41/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
new file mode 100644
index 0000000..479fe89
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import javax.cache.expiry.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Simple cache test.
+ */
+public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTest {
+    /** Grid 1. */
+    private static Ignite ignite1;
+
+    /** Grid 2. */
+    private static Ignite ignite2;
+
+    /** Grid 3. */
+    private static Ignite ignite3;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        ignite1 = grid(0);
+        ignite2 = grid(1);
+        ignite3 = grid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite1 = null;
+        ignite2 = null;
+        ignite3 = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        for (Ignite g : G.allGrids())
+            g.cache(null).clearAll();
+    }
+
+    /**
+     *
+     * @throws Exception If error occur.
+     */
+    public void testBasicOps() throws Exception {
+        CountDownLatch latch = new CountDownLatch(3);
+
+        CacheEventListener lsnr = new CacheEventListener(latch);
+
+        try {
+            GridCache<String, String> cache1 = ignite1.cache(null);
+            GridCache<String, String> cache2 = ignite2.cache(null);
+            GridCache<String, String> cache3 = ignite3.cache(null);
+
+            ignite1.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite3.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+
+            assert !cache1.containsKey("1");
+            assert !cache2.containsKey("1");
+            assert !cache3.containsKey("1");
+
+            info("First put");
+
+            cache1.put("1", "a");
+
+            info("Start latch wait 1");
+
+            assert latch.await(5, SECONDS);
+
+            info("Stop latch wait 1");
+
+            assert cache1.containsKey("1");
+            assert cache2.containsKey("1");
+            assert cache3.containsKey("1");
+
+            latch = new CountDownLatch(6);
+
+            lsnr.setLatch(latch);
+
+            cache2.put("1", "b");
+            cache3.put("1", "c");
+
+            info("Start latch wait 2");
+
+            assert latch.await(5, SECONDS);
+
+            info("Stop latch wait 2");
+
+            assert cache1.containsKey("1");
+            assert cache2.containsKey("1");
+            assert cache3.containsKey("1");
+
+            latch = new CountDownLatch(3);
+
+            lsnr.setLatch(latch);
+
+            cache1.remove("1");
+
+            info("Start latch wait 3");
+
+            assert latch.await(5, SECONDS);
+
+            info("Stop latch wait 3");
+
+            assert !cache1.containsKey("1") : "Key set: " + cache1.keySet();
+            assert !cache2.containsKey("1") : "Key set: " + cache2.keySet();
+            assert !cache3.containsKey("1") : "Key set: " + cache3.keySet();
+        }
+        finally {
+            ignite1.events().stopLocalListen(lsnr);
+            ignite2.events().stopLocalListen(lsnr);
+            ignite3.events().stopLocalListen(lsnr);
+        }
+    }
+
+    /**
+     * @throws Exception If test fails.
+     */
+    public void testBasicOpsAsync() throws Exception {
+        CountDownLatch latch = new CountDownLatch(3);
+
+        CacheEventListener lsnr = new CacheEventListener(latch);
+
+        try {
+            GridCache<String, String> cache1 = ignite1.cache(null);
+            GridCache<String, String> cache2 = ignite2.cache(null);
+            GridCache<String, String> cache3 = ignite3.cache(null);
+
+            ignite1.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite3.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+
+            IgniteFuture<String> f1 = cache1.getAsync("async1");
+
+            assert f1.get() == null;
+
+            f1 = cache1.putAsync("async1", "asyncval1");
+
+            assert f1.get() == null;
+
+            f1 = cache1.getAsync("async1");
+
+            String v1 = f1.get();
+
+            assert v1 != null;
+            assert "asyncval1".equals(v1);
+
+            assert latch.await(5, SECONDS);
+
+            IgniteFuture<String> f2 = cache2.getAsync("async1");
+            IgniteFuture<String> f3 = cache3.getAsync("async1");
+
+            String v2 = f2.get();
+            String v3 = f3.get();
+
+            assert v2 != null;
+            assert v3 != null;
+
+            assert "asyncval1".equals(v2);
+            assert "asyncval1".equals(v3);
+
+            lsnr.setLatch(latch = new CountDownLatch(3));
+
+            f2 = cache2.removeAsync("async1");
+
+            assert "asyncval1".equals(f2.get());
+
+            assert latch.await(5, SECONDS);
+
+            f1 = cache1.getAsync("async1");
+            f2 = cache2.getAsync("async1");
+            f3 = cache3.getAsync("async1");
+
+            v1 = f1.get();
+            v2 = f2.get();
+            v3 = f3.get();
+
+            info("Removed v1: " + v1);
+            info("Removed v2: " + v2);
+            info("Removed v3: " + v3);
+
+            assert v1 == null;
+            assert v2 == null;
+            assert v3 == null;
+        }
+        finally {
+            ignite1.events().stopLocalListen(lsnr);
+            ignite2.events().stopLocalListen(lsnr);
+            ignite3.events().stopLocalListen(lsnr);
+        }
+    }
+
+    /**
+     *
+     * @throws IgniteCheckedException If test fails.
+     */
+    public void testOptimisticTransaction() throws Exception {
+        CountDownLatch latch = new CountDownLatch(9);
+
+        IgnitePredicate<IgniteEvent> lsnr = new CacheEventListener(latch);
+
+        try {
+            GridCache<String, String> cache1 = ignite1.cache(null);
+            GridCache<String, String> cache2 = ignite2.cache(null);
+            GridCache<String, String> cache3 = ignite3.cache(null);
+
+            ignite1.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+            ignite3.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
+
+            IgniteTx tx = cache1.txStart(OPTIMISTIC, READ_COMMITTED, 0, 0);
+
+            try {
+                cache1.put("tx1", "val1");
+                cache1.put("tx2", "val2");
+                cache1.put("tx3", "val3");
+
+                assert cache2.get("tx1") == null;
+                assert cache2.get("tx2") == null;
+                assert cache2.get("tx3") == null;
+
+                assert cache3.get("tx1") == null;
+                assert cache3.get("tx2") == null;
+                assert cache3.get("tx3") == null;
+
+                tx.commit();
+            }
+            catch (IgniteCheckedException e) {
+                tx.rollback();
+
+                throw e;
+            }
+
+            assert latch.await(5, SECONDS);
+
+            String b1 = cache2.get("tx1");
+            String b2 = cache2.get("tx2");
+            String b3 = cache2.get("tx3");
+
+            String c1 = cache3.get("tx1");
+            String c2 = cache3.get("tx2");
+            String c3 = cache3.get("tx3");
+
+            assert b1 != null : "Invalid value: " + b1;
+            assert b2 != null : "Invalid value: " + b2;
+            assert b3 != null : "Invalid value: " + b3;
+
+            assert c1 != null : "Invalid value: " + c1;
+            assert c2 != null : "Invalid value: " + c2;
+            assert c3 != null : "Invalid value: " + c3;
+
+            assert "val1".equals(b1);
+            assert "val2".equals(b2);
+            assert "val3".equals(b3);
+
+            assert "val1".equals(c1);
+            assert "val2".equals(c2);
+            assert "val3".equals(c3);
+        }
+        finally {
+            ignite1.events().stopLocalListen(lsnr);
+            ignite2.events().stopLocalListen(lsnr);
+            ignite3.events().stopLocalListen(lsnr);
+        }
+    }
+
+    /**
+     *
+     * @throws Exception In case of error.
+     */
+    public void testPutWithExpiration() throws Exception {
+        IgniteCache<String, String> cache1 = ignite1.jcache(null);
+        IgniteCache<String, String> cache2 = ignite2.jcache(null);
+        IgniteCache<String, String> cache3 = ignite3.jcache(null);
+
+        cache1.put("key", "val");
+
+        IgniteTx tx = ignite1.transactions().txStart();
+
+        long ttl = 500;
+
+        cache1.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl))).put("key", "val");
+
+        assert cache1.get("key") != null;
+
+        tx.commit();
+
+        info("Going to sleep for: " + (ttl + 1000));
+
+        // Allow for expiration.
+        Thread.sleep(ttl + 1000);
+
+        String v1 = cache1.get("key");
+        String v2 = cache2.get("key");
+        String v3 = cache3.get("key");
+
+        assert v1 == null : "V1 should be null: " + v1;
+        assert v2 == null : "V2 should be null: " + v2;
+        assert v3 == null : "V3 should be null: " + v3;
+    }
+
+    /**
+     * Event listener.
+     */
+    private class CacheEventListener implements IgnitePredicate<IgniteEvent> {
+        /** Wait latch. */
+        private CountDownLatch latch;
+
+        /**
+         * @param latch Wait latch.
+         */
+        CacheEventListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        /**
+         * @param latch New latch.
+         */
+        void setLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(IgniteEvent evt) {
+            assert evt.type() == EVT_CACHE_OBJECT_PUT || evt.type() == EVT_CACHE_OBJECT_REMOVED :
+                "Unexpected event type: " + evt;
+
+            info("Grid cache event: " + evt);
+
+            latch.countDown();
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java
new file mode 100644
index 0000000..55c3957
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.store.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests near-only cache.
+ */
+public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbstractSelfTest {
+    /** Grid cnt. */
+    private static AtomicInteger gridCnt;
+
+    /** Near-only cache grid name. */
+    private static String nearOnlyGridName;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        gridCnt = new AtomicInteger();
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheStore<?, ?> cacheStore() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        if (gridCnt.getAndIncrement() == 0) {
+            cfg.setDistributionMode(clientOnly() ? CLIENT_ONLY : NEAR_ONLY);
+
+            nearOnlyGridName = gridName;
+        }
+
+        cfg.setCacheStoreFactory(null);
+        cfg.setReadThrough(false);
+        cfg.setWriteThrough(false);
+        cfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 32));
+        cfg.setBackups(1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        gridCnt.set(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * @return If {@code true} then uses CLIENT_ONLY mode, otherwise NEAR_ONLY.
+     */
+    protected abstract boolean clientOnly();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutFromClientNode() throws Exception {
+        GridCache<Object, Object> nearOnly = nearOnlyCache();
+
+        for (int i = 0; i < 5; i++)
+            nearOnly.put(i, i);
+
+        nearOnly.putAll(F.asMap(5, 5, 6, 6, 7, 7, 8, 8, 9, 9));
+
+        for (int key = 0; key < 10; key++) {
+            for (int i = 1; i < gridCount(); i++) {
+                if (grid(i).cache(null).affinity().isPrimaryOrBackup(grid(i).localNode(), key))
+                    assertEquals(key, grid(i).cache(null).peek(key));
+            }
+
+            if (nearEnabled())
+                assertEquals(key, nearOnly.peek(key));
+
+            assertNull(nearOnly.peek(key, F.asList(GridCachePeekMode.PARTITIONED_ONLY)));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromClientNode() throws Exception {
+        GridCache<Object, Object> dht = dhtCache();
+
+        for (int i = 0; i < 10; i++)
+            dht.put(i, i);
+
+        GridCache<Object, Object> nearOnly = nearOnlyCache();
+
+        assert dht != nearOnly;
+
+        for (int key = 0; key < 10; key++) {
+            // At start near only cache does not have any values.
+            if (nearEnabled())
+                assertNull(nearOnly.peek(key));
+
+            // Get should succeed.
+            assertEquals(key, nearOnly.get(key));
+
+            // Now value should be cached.
+            if (nearEnabled())
+                assertEquals(key, nearOnly.peek(key));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearOnlyAffinity() throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite g = grid(i);
+
+            if (F.eq(g.name(), nearOnlyGridName)) {
+                for (int k = 0; k < 10000; k++) {
+                    GridCache<Object, Object> cache = g.cache(null);
+
+                    String key = "key" + k;
+
+                    if (cacheMode() == PARTITIONED)
+                        assertFalse(cache.entry(key).primary() || cache.entry(key).backup());
+
+                    assertFalse(cache.affinity().mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode()));
+                }
+            }
+            else {
+                boolean foundEntry = false;
+                boolean foundAffinityNode = false;
+
+                for (int k = 0; k < 10000; k++) {
+                    GridCache<Object, Object> cache = g.cache(null);
+
+                    String key = "key" + k;
+
+                    if (cache.entry(key).primary() || cache.entry(key).backup())
+                        foundEntry = true;
+
+                    if (cache.affinity().mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode()))
+                        foundAffinityNode = true;
+                }
+
+                assertTrue("Did not found primary or backup entry for grid: " + i, foundEntry);
+                assertTrue("Did not found affinity node for grid: " + i, foundAffinityNode);
+            }
+        }
+    }
+
+    /**
+     * @return Near only cache for this test.
+     */
+    protected GridCache<Object, Object> nearOnlyCache() {
+        assert nearOnlyGridName != null;
+
+        return G.ignite(nearOnlyGridName).cache(null);
+    }
+
+    /**
+     * @return DHT cache for this test.
+     */
+    protected GridCache<Object, Object> dhtCache() {
+        for (int i = 0; i < gridCount(); i++) {
+            if (!nearOnlyGridName.equals(grid(i).name()))
+                return grid(i).cache(null);
+        }
+
+        assert false : "Cannot find DHT cache for this test.";
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
new file mode 100644
index 0000000..9375193
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public abstract class GridCacheEntrySetAbstractSelfTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private static final String TX_KEY = "txKey";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheWriteSynchronizationMode writeSynchronization() {
+        return FULL_SYNC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntrySet() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            final AtomicInteger cacheIdx = new AtomicInteger(0);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int idx = cacheIdx.getAndIncrement();
+
+                    log.info("Use cache " + idx);
+
+                    GridCache<Object, Object> cache = grid(idx).cache(null);
+
+                    for (int i = 0; i < 100; i++)
+                        putAndCheckEntrySet(cache);
+
+                    return null;
+                }
+            }, GRID_CNT, "test");
+
+            for (int j = 0; j < gridCount(); j++)
+                cache(j).removeAll();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void putAndCheckEntrySet(GridCache<Object, Object> cache) throws Exception {
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            Integer total = (Integer) cache.get(TX_KEY);
+
+            if (total == null)
+                total = 0;
+
+            int cntr = 0;
+
+            Set<GridCacheEntry<Object, Object>> entries = cache.entrySet();
+
+            for (GridCacheEntry e : entries) {
+                if (e.getKey() instanceof Integer)
+                    cntr++;
+            }
+
+            assertEquals(total, (Integer)cntr);
+
+            cache.putx(cntr + 1, cntr + 1);
+
+            cache.putx(TX_KEY, cntr + 1);
+
+            tx.commit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java
new file mode 100644
index 0000000..628bea5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import java.util.*;
+
+/**
+ * Tests entry wrappers after preloading happened.
+ */
+public class GridCacheEntrySetIterationPreloadingSelfTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return GridCacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return GridCacheDistributionMode.PARTITIONED_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return GridCacheAtomicityMode.ATOMIC;
+    }
+
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setPreloadMode(GridCachePreloadMode.SYNC);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIteration()  throws Exception {
+        try {
+            final GridCache<String, Integer> cache = cache();
+
+            final int entryCnt = 1000;
+
+            for (int i = 0; i < entryCnt; i++)
+                cache.put(String.valueOf(i), i);
+
+            Collection<GridCacheEntry<String, Integer>> entries = new ArrayList<>(10_000);
+
+            for (int i = 0; i < 10_000; i++)
+                entries.add(cache.randomEntry());
+
+            startGrid(1);
+            startGrid(2);
+            startGrid(3);
+
+            for (GridCacheEntry<String, Integer> entry : entries)
+                entry.partition();
+
+            for (int i = 0; i < entryCnt; i++)
+                cache.remove(String.valueOf(i));
+        }
+        finally {
+            stopGrid(3);
+            stopGrid(2);
+            stopGrid(1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java
new file mode 100644
index 0000000..0ae38f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java
@@ -0,0 +1,964 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Tests events.
+ */
+public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final boolean TEST_INFO = true;
+
+    /** Wait timeout. */
+    private static final long WAIT_TIMEOUT = 5000;
+
+    /** Key. */
+    private static final String KEY = "key";
+
+    /** */
+    private static volatile int gridCnt;
+
+    /**
+     * @return {@code True} if partitioned.
+     */
+    protected boolean partitioned() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        gridCnt = gridCount();
+
+        for (int i = 0; i < gridCnt; i++)
+            grid(i).events().localListen(new TestEventListener(partitioned()), EVTS_CACHE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (TEST_INFO)
+            info("Called beforeTest() callback.");
+
+        TestEventListener.reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (TEST_INFO)
+            info("Called afterTest() callback.");
+
+        TestEventListener.stopListen();
+
+        try {
+            super.afterTest();
+        }
+        finally {
+            TestEventListener.listen();
+        }
+    }
+
+    /**
+     * Waits for event count on all nodes.
+     *
+     * @param gridIdx Grid index.
+     * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count on one node.
+     * @throws InterruptedException If thread has been interrupted while waiting.
+     */
+    private void waitForEvents(int gridIdx, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception {
+        if (!F.isEmpty(evtCnts))
+            try {
+                TestEventListener.waitForEventCount(((GridKernal)grid(0)).context(), evtCnts);
+            }
+            catch (IgniteCheckedException e) {
+                printEventCounters(gridIdx, evtCnts);
+
+                throw e;
+            }
+    }
+
+    /**
+     * @param gridIdx Grid index.
+     * @param expCnts Expected counters
+     */
+    private void printEventCounters(int gridIdx, IgniteBiTuple<Integer, Integer>[] expCnts) {
+        info("Printing counters [gridIdx=" + gridIdx + ']');
+
+        for (IgniteBiTuple<Integer, Integer> t : expCnts) {
+            Integer evtType = t.get1();
+
+            int actCnt = TestEventListener.eventCount(evtType);
+
+            info("Event [evtType=" + evtType + ", expCnt=" + t.get2() + ", actCnt=" + actCnt + ']');
+        }
+    }
+
+    /**
+     * Clear caches without generating events.
+     *
+     * @throws IgniteCheckedException If failed to clear caches.
+     */
+    private void clearCaches() throws IgniteCheckedException {
+        for (int i = 0; i < gridCnt; i++) {
+            GridCache<String, Integer> cache = cache(i);
+
+            cache.removeAll();
+
+            assert cache.isEmpty();
+        }
+    }
+
+    /**
+     * Runs provided {@link TestCacheRunnable} instance on all caches.
+     *
+     * @param run {@link TestCacheRunnable} instance.
+     * @param evtCnts Expected event counts for each iteration.
+     * @throws Exception In failed.
+     */
+    @SuppressWarnings({"CaughtExceptionImmediatelyRethrown"})
+    private void runTest(TestCacheRunnable run, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            info(">>> Running test for grid [idx=" + i + ", grid=" + grid(i).name() +
+                ", id=" + grid(i).localNode().id() + ']');
+
+            try {
+                run.run(cache(i));
+
+                waitForEvents(i, evtCnts);
+            }
+            catch (Exception e) { // Leave this catch to be able to set breakpoint.
+                throw e;
+            }
+            finally {
+                // This call is mainly required to correctly clear event futures.
+                TestEventListener.reset();
+
+                clearCaches();
+
+                // This call is required for the second time to reset counters for
+                // the previous call.
+                TestEventListener.reset();
+            }
+        }
+    }
+
+    /**
+     * Get key-value pairs.
+     *
+     * @param size Pairs count.
+     * @return Key-value pairs.
+     */
+    private Map<String, Integer> pairs(int size) {
+        Map<String, Integer> pairs = new HashMap<>(size);
+
+        for (int i = 1; i <= size; i++)
+            pairs.put(KEY + i, i);
+
+        return pairs;
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testFilteredPut() throws Exception {
+        GridCache<String, Integer> cache = grid(0).cache(null);
+
+        String key = "1";
+        int val = 1;
+
+        assert !cache.putx(key, val, F.<String, Integer>cacheHasPeekValue());
+
+        assert !cache.containsKey(key);
+
+        assertEquals(0, TestEventListener.eventCount(EVT_CACHE_OBJECT_PUT));
+
+        assert cache.putx(key, val);
+
+        assert cache.containsKey(key);
+
+        waitForEvents(0, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemove() throws Exception {
+        // TODO: GG-7578.
+        if (cache(0).configuration().getCacheMode() == GridCacheMode.REPLICATED)
+            return;
+
+        runTest(
+            new TestCacheRunnable() {
+                @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                    String key = "key";
+                    Integer val = 1;
+
+                    assert cache.put(key, val) == null;
+
+                    assert cache.containsKey(key);
+
+                    assertEquals(val, cache.get(key));
+
+                    assertEquals(val, cache.remove(key));
+
+                    assert !cache.containsKey(key);
+                }
+            },
+            F.t(EVT_CACHE_OBJECT_PUT, gridCnt),
+            F.t(EVT_CACHE_OBJECT_READ, 3),
+            F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)
+        );
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemoveTx1() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.put(key, val) == null;
+
+                assert cache.containsKey(key);
+
+                assert val.equals(cache.get(key));
+
+                assert val.equals(cache.remove(key));
+
+                assert !cache.containsKey(key);
+
+                tx.commit();
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemoveTx2() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.put(key, val) == null;
+
+                assert cache.containsKey(key);
+
+                assert val.equals(cache.get(key));
+
+                assert val.equals(cache.remove(key));
+
+                assert !cache.containsKey(key);
+
+                assert cache.put(key, val) == null;
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemoveAsync() throws Exception {
+        // TODO: GG-7578.
+        if (cache(0).configuration().getCacheMode() == GridCacheMode.REPLICATED)
+            return;
+
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putAsync(key, val).get() == null;
+
+                assert cache.containsKey(key);
+
+                assert val.equals(cache.getAsync(key).get());
+
+                assert val.equals(cache.removeAsync(key).get());
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_READ, 3), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemoveAsyncTx1() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.putAsync(key, val).get() == null;
+
+                assert cache.containsKey(key);
+
+                assert val.equals(cache.getAsync(key).get());
+
+                assert val.equals(cache.removeAsync(key).get());
+
+                assert !cache.containsKey(key);
+
+                tx.commit();
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testGetPutRemoveAsyncTx2() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.putAsync(key, val).get() == null;
+
+                assert cache.containsKey(key);
+
+                assert val.equals(cache.getAsync(key).get());
+
+                assert val.equals(cache.removeAsync(key).get());
+
+                assert !cache.containsKey(key);
+
+                assert cache.putAsync(key, val).get() == null;
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutRemovex() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putx(key, val);
+
+                assert cache.containsKey(key);
+
+                assert cache.removex(key);
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutRemovexTx1() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.putx(key, val);
+
+                assert cache.containsKey(key);
+
+                assert cache.removex(key);
+
+                assert !cache.containsKey(key);
+
+                tx.commit();
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutRemovexTx2() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert cache.putx(key, val);
+
+                assert cache.containsKey(key);
+
+                assert cache.removex(key);
+
+                assert !cache.containsKey(key);
+
+                assert cache.putx(key, val);
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutIfAbsent() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
+
+                Map.Entry<String, Integer> e = iter.next();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putIfAbsent(key, val) == null;
+                assert val.equals(cache.putIfAbsent(key, val));
+
+                assert cache.containsKey(key);
+
+                e = iter.next();
+
+                key = e.getKey();
+                val = e.getValue();
+
+                assert cache.putxIfAbsent(key, val);
+                assert !cache.putxIfAbsent(key, val);
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutIfAbsentTx() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
+
+                IgniteTx tx = cache.txStart();
+
+                Map.Entry<String, Integer> e = iter.next();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putIfAbsent(key, val) == null;
+
+                assertEquals(val, cache.putIfAbsent(key, val));
+
+                assert cache.containsKey(key);
+
+                e = iter.next();
+
+                key = e.getKey();
+                val = e.getValue();
+
+                assert cache.putxIfAbsent(key, val);
+                assert !cache.putxIfAbsent(key, val);
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testPutIfAbsentAsync() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
+
+                Map.Entry<String, Integer> e = iter.next();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putIfAbsentAsync(key, val).get() == null;
+                assert val.equals(cache.putIfAbsentAsync(key, val).get());
+
+                assert cache.containsKey(key);
+
+                e = iter.next();
+
+                key = e.getKey();
+                val = e.getValue();
+
+                assert cache.putxIfAbsentAsync(key, val).get();
+                assert !cache.putxIfAbsentAsync(key, val).get();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testPutIfAbsentAsyncTx() throws Exception {
+        IgniteBiTuple[] evts = new IgniteBiTuple[] {F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt), F.t(EVT_CACHE_OBJECT_READ, 1)};
+
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
+
+                // Optimistic transaction.
+                IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
+
+                Map.Entry<String, Integer> e = iter.next();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert cache.putIfAbsentAsync(key, val).get() == null;
+                assert val.equals(cache.putIfAbsentAsync(key, val).get());
+
+                assert cache.containsKey(key);
+
+                e = iter.next();
+
+                key = e.getKey();
+                val = e.getValue();
+
+                assert cache.putxIfAbsentAsync(key, val).get();
+                assert !cache.putxIfAbsentAsync(key, val).get();
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, evts);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testFilteredPutRemovex() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue();
+                IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                assert !cache.putx(key, val, hasPeekVal);
+                assert cache.putx(key, val, noPeekVal);
+
+                assert cache.containsKey(key);
+
+                assert !cache.removex(key, noPeekVal);
+                assert cache.removex(key, hasPeekVal);
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testFilteredPutRemovexTx1() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                assert cache.keySet().isEmpty() : "Key set is not empty: " + cache().keySet();
+
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue();
+                IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                // Optimistic.
+                IgniteTx tx = cache.txStart();
+
+                assert !cache.putx(key, val, hasPeekVal);
+                assert cache.putx(key, val, noPeekVal);
+
+                assert cache.containsKey(key);
+
+                assert !cache.removex(key, noPeekVal);
+                assert cache.removex(key);
+
+                assert !cache.containsKey(key);
+
+                tx.commit();
+
+                assert !cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testFilteredPutRemovexTx2() throws Exception {
+        runTest(new TestCacheRunnable() {
+            @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
+                Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
+
+                assert e != null;
+
+                IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue();
+                IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue();
+
+                String key = e.getKey();
+                Integer val = e.getValue();
+
+                IgniteTx tx = cache.txStart();
+
+                assert !cache.putx(key, val, hasPeekVal);
+                assert cache.putx(key, val, noPeekVal);
+
+                assert cache.containsKey(key);
+
+                assert !cache.removex(key, noPeekVal);
+                assert cache.removex(key, hasPeekVal);
+
+                assert !cache.containsKey(key);
+
+                assert !cache.putx(key, val, hasPeekVal);
+                assert cache.putx(key, val, noPeekVal);
+
+                assert cache.containsKey(key);
+
+                tx.commit();
+
+                assert cache.containsKey(key);
+            }
+        }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
+    }
+
+    /**
+     *
+     */
+    private static interface TestCacheRunnable {
+        /**
+         * @param cache Cache.
+         * @throws IgniteCheckedException If any exception occurs.
+         */
+        void run(GridCache<String, Integer> cache) throws IgniteCheckedException;
+    }
+
+    /**
+     * Local event listener.
+     */
+    private static class TestEventListener implements IgnitePredicate<IgniteEvent> {
+        /** Events count map. */
+        private static ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>();
+
+        /** Event futures. */
+        private static Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>();
+
+        /** */
+        private static volatile boolean listen = true;
+
+        /** */
+        private static boolean partitioned;
+
+        /**
+         * @param p Partitioned flag.
+         */
+        private TestEventListener(boolean p) {
+            partitioned = p;
+        }
+
+        /**
+         *
+         */
+        private static void listen() {
+            listen = true;
+        }
+
+        /**
+         *
+         */
+        private static void stopListen() {
+            listen = false;
+        }
+
+        /**
+         * @param type Event type.
+         * @return Count.
+         */
+        static int eventCount(int type) {
+            assert type > 0;
+
+            AtomicInteger cntr = cntrs.get(type);
+
+            return cntr != null ? cntr.get() : 0;
+        }
+
+        /**
+         * Reset listener.
+         */
+        static void reset() {
+            cntrs.clear();
+
+            futs.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(IgniteEvent evt) {
+            assert evt instanceof IgniteCacheEvent;
+
+            if (!listen)
+                return true;
+
+            if (TEST_INFO)
+                X.println("Cache event: " + evt.shortDisplay());
+
+            AtomicInteger cntr = F.addIfAbsent(cntrs, evt.type(), F.newAtomicInt());
+
+            assert cntr != null;
+
+            int cnt = cntr.incrementAndGet();
+
+            for (EventTypeFuture f : futs)
+                f.onEvent(evt.type(), cnt);
+
+            return true;
+        }
+
+        /**
+         * Waits for event count.
+         *
+         * @param ctx Kernal context.
+         * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count.
+         * @throws IgniteCheckedException If failed to wait.
+         */
+        private static void waitForEventCount(GridKernalContext ctx,
+            IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException {
+            if (F.isEmpty(evtCnts))
+                return;
+
+            // Create future that aggregates all required event types.
+            GridCompoundIdentityFuture<Object> cf = new GridCompoundIdentityFuture<>(ctx);
+
+            for (IgniteBiTuple<Integer, Integer> t : evtCnts) {
+                Integer evtType = t.get1();
+                Integer expCnt = t.get2();
+
+                assert expCnt != null && expCnt > 0;
+
+                EventTypeFuture fut = new EventTypeFuture(ctx, evtType, expCnt, partitioned);
+
+                futs.add(fut);
+
+                // We need to account the window.
+                AtomicInteger cntr = cntrs.get(evtType);
+
+                if (!fut.isDone())
+                    fut.onEvent(evtType, cntr != null ? cntr.get() : 0);
+
+                cf.add(fut);
+            }
+
+            cf.markInitialized();
+
+            try {
+                cf.get(WAIT_TIMEOUT);
+            }
+            catch (IgniteFutureTimeoutException e) {
+                throw new RuntimeException("Timed out waiting for events: " + cf, e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class EventTypeFuture extends GridFutureAdapter<Object> {
+        /** */
+        private int evtType;
+
+        /** */
+        private int expCnt;
+
+        /** */
+        private int cnt;
+
+        /** Partitioned flag. */
+        private boolean partitioned;
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public EventTypeFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param evtType Event type.
+         * @param expCnt Expected count.
+         * @param partitioned Partitioned flag.
+         */
+        EventTypeFuture(GridKernalContext ctx, int evtType, int expCnt, boolean partitioned) {
+            super(ctx);
+
+            assert expCnt > 0;
+
+            this.evtType = evtType;
+            this.expCnt = expCnt;
+            this.partitioned = partitioned;
+        }
+
+        /**
+         * @return Count.
+         */
+        int count() {
+            return cnt;
+        }
+
+        /**
+         * @param evtType Event type.
+         * @param cnt Count.
+         */
+        void onEvent(int evtType, int cnt) {
+            if (isDone() || this.evtType != evtType)
+                return;
+
+            if (TEST_INFO)
+                X.println("EventTypeFuture.onEvent() [evtName=" + U.gridEventName(evtType) + ", evtType=" + evtType +
+                    ", cnt=" + cnt + ", expCnt=" + expCnt + ']');
+
+            this.cnt = cnt;
+
+
+            // For partitioned caches we allow extra event for reads.
+            if (expCnt < cnt && (!partitioned || evtType != EVT_CACHE_OBJECT_READ || expCnt + 1 < cnt))
+                onDone(new IgniteCheckedException("Wrong event count [evtName=" + U.gridEventName(evtType) + ", evtType=" +
+                    evtType + ", expCnt=" + expCnt + ", actCnt=" + cnt + ", partitioned=" + partitioned + "]"));
+
+            if (expCnt == cnt || (partitioned && expCnt + 1 == cnt))
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EventTypeFuture.class, this, "evtName", U.gridEventName(evtType));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java
new file mode 100644
index 0000000..9ecb5c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+
+import javax.cache.expiry.*;
+import java.util.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Tests preloading of expired entries.
+ */
+public abstract class GridCacheExpiredEntriesPreloadAbstractSelfTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setPreloadMode(SYNC);
+        cfg.setCacheStoreFactory(null);
+        cfg.setWriteThrough(false);
+        cfg.setReadThrough(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiredEntriesPreloading() throws Exception {
+        GridCache<String, Integer> cache0 = cache(0);
+
+        final int KEYS_NUM = 3;
+
+        for (int i = 0; i < KEYS_NUM; i++)
+            cache0.put(String.valueOf(i), 0);
+
+        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, 100L));
+
+        IgniteCache cache = grid(0).jcache(null).withExpiryPolicy(expiry);
+
+        for (int i = 0; i < KEYS_NUM; i++)
+            cache.put(String.valueOf(i), i);
+
+        // Allow entries to expire.
+        U.sleep(1000);
+
+        // Ensure entries expiration.
+        for (int i = 0; i < KEYS_NUM; i++)
+            assert cache0.get(String.valueOf(i)) == null;
+
+        // Start another node.
+        Ignite g1 = startGrid(1);
+
+        final GridCacheAdapter<String, Integer> cache1 = ((GridKernal)g1).context().cache().internalCache();
+
+        cache1.preloader().syncFuture().get();
+
+        Collection<IgniteEvent> evts = g1.events().localQuery(F.<IgniteEvent>alwaysTrue(), EVT_CACHE_PRELOAD_OBJECT_LOADED);
+
+        assertEquals("Expected all entries are preloaded.", KEYS_NUM, evts.size());
+
+        boolean rmv = GridTestUtils.waitForCondition(new PAX() {
+            @Override public boolean applyx() {
+                return cache1.isEmpty();
+            }
+        }, 10_000);
+
+        assertTrue("Expired entries were not removed.", rmv);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
new file mode 100644
index 0000000..4845341
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test cases for multi-threaded tests.
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest {
+    /** Grid1. */
+    private static Ignite ignite1;
+
+    /** Grid2. */
+    private static Ignite ignite2;
+
+    /** (for convenience). */
+    private static IgniteCache<Integer, String> cache1;
+
+    /** (for convenience). */
+    private static IgniteCache<Integer, String> cache2;
+
+    /** Ip-finder. */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    protected GridCacheLockAbstractTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        return cacheCfg;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract GridCacheMode cacheMode();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite1 = startGrid(1);
+        ignite2 = startGrid(2);
+
+        cache1 = ignite1.jcache(null);
+        cache2 = ignite2.jcache(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        info("Executing afterTest() callback...");
+
+        info("Before 1st removeAll().");
+
+        cache1.removeAll();
+
+        info("Before 2nd removeAll().");
+
+        cache2.removeAll();
+
+        assert cache1.size() == 0 : "Cache is not empty: " + cache1;
+        assert cache2.size() == 0 : "Cache is not empty: " + cache2;
+    }
+
+    /**
+     * @return Partitioned flag.
+     */
+    protected boolean isPartitioned() {
+        return false;
+    }
+
+    /**
+     * @param k Key to check.
+     * @param idx Grid index.
+     * @return {@code True} if locked.
+     */
+    private boolean locked(Integer k, int idx) {
+        if (isPartitioned())
+            return near(idx).isLockedNearOnly(k);
+
+        return cache(idx).isLocked(k);
+    }
+
+    /**
+     * @param keys Keys to check.
+     * @param idx Grid index.
+     * @return {@code True} if locked.
+     */
+    private boolean locked(Iterable<Integer> keys, int idx) {
+        if (isPartitioned())
+            return near(idx).isAllLockedNearOnly(keys);
+
+        for (Integer key : keys) {
+            if (!cache(idx).isLocked(key))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testLockSingleThread() throws Exception {
+        int k = 1;
+        String v = String.valueOf(k);
+
+        info("Before lock for key: " + k);
+
+        cache1.lock(k).lock();
+
+        info("After lock for key: " + k);
+
+        try {
+            assert cache1.isLocked(k);
+            assert cache1.isLockedByThread(k);
+
+            // Put to cache.
+            cache1.put(k, v);
+
+            info("Put " + k + '=' + k + " key pair into cache.");
+        }
+        finally {
+            cache1.lock(k).unlock();
+
+            info("Unlocked key: " + k);
+        }
+
+        assert !locked(k, 1);
+        assert !cache1.isLockedByThread(k);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testLock() throws Exception {
+        final int kv = 1;
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        GridTestThread t1 = new GridTestThread(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                info("Before lock for key: " + kv);
+
+                cache1.lock(kv).lock();
+
+                info("After lock for key: " + kv);
+
+                try {
+                    assert cache1.isLocked(kv);
+                    assert cache1.isLockedByThread(kv);
+
+                    l1.countDown();
+
+                    info("Let thread2 proceed.");
+
+                    cache1.put(kv, Integer.toString(kv));
+
+                    info("Put " + kv + '=' + Integer.toString(kv) + " key pair into cache.");
+                }
+                finally {
+                    Thread.sleep(1000);
+
+                    cache1.lockAll(Collections.singleton(kv)).unlock();
+
+                    info("Unlocked key in thread 1: " + kv);
+                }
+
+                l2.await();
+
+                assert !cache1.isLockedByThread(kv);
+                assert !locked(kv, 1);
+
+                return null;
+            }
+        });
+
+        GridTestThread t2 = new GridTestThread(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                info("Waiting for latch1...");
+
+                l1.await();
+
+                cache2.lock(kv).lock();
+
+                try {
+                    String v = cache2.get(kv);
+
+                    assert v != null : "Value is null for key: " + kv;
+
+                    assertEquals(Integer.toString(kv), v);
+                }
+                finally {
+                    cache2.lockAll(Collections.singleton(kv)).unlock();
+
+                    info("Unlocked key in thread 2: " + kv);
+                }
+
+                assert !locked(kv, 2);
+                assert !cache2.isLockedByThread(kv);
+
+                Thread.sleep(1000);
+
+                l2.countDown();
+
+                return null;
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        t1.checkError();
+        t2.checkError();
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testLockAndPut() throws Exception {
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        GridTestThread t1 = new GridTestThread(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                cache1.lock(1).lock();
+
+                info("Locked cache key: 1");
+
+                try {
+                    assert cache1.isLocked(1);
+                    assert cache1.isLockedByThread(1);
+
+                    info("Verified that cache key is locked: 1");
+
+                    cache1.put(1, "1");
+
+                    info("Put key value pair into cache: 1='1'");
+
+                    l1.countDown();
+
+                    info("Released latch1");
+
+                    // Hold lock for a bit.
+                    Thread.sleep(50);
+
+                    info("Woke up from sleep.");
+                }
+                finally {
+                    cache1.lockAll(Collections.singleton(1)).unlock();
+
+                    info("Unlocked cache key: 1");
+                }
+
+                l2.await();
+
+                assert !locked(1, 1);
+                assert !cache1.isLockedByThread(1);
+
+                return null;
+            }
+        });
+
+        GridTestThread t2 = new GridTestThread(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                info("Beginning to await on latch 1");
+
+                l1.await();
+
+                info("Finished awaiting on latch 1");
+
+                assertEquals("1", cache1.get(1));
+
+                info("Retrieved value from cache for key: 1");
+
+                cache1.put(1, "2");
+
+                info("Put key-value pair into cache: 1='2'");
+
+                assertEquals("2", cache1.getAndRemove(1));
+
+                l2.countDown();
+
+                info("Removed key from cache: 1");
+
+                return null;
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        t1.checkError();
+        t2.checkError();
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testLockTimeoutTwoThreads() throws Exception {
+        int keyCnt = 1;
+
+        final Set<Integer> keys = new HashSet<>();
+
+        for (int i = 1; i <= keyCnt; i++)
+            keys.add(i);
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        IgniteFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Nullable @Override public Object call() throws Exception {
+                    info("Before lock for keys.");
+
+                    cache1.lockAll(keys).lock();
+
+                    info("After lock for keys.");
+
+                    try {
+                        for (Integer key : keys) {
+                            assert cache1.isLocked(key);
+                            assert cache1.isLockedByThread(key);
+                        }
+
+                        l1.countDown();
+
+                        info("Let thread2 proceed.");
+
+                        for (int i : keys) {
+                            info("Before put key: " + i);
+
+                            cache1.put(i, Integer.toString(i));
+
+                            if (i % 50 == 0)
+                                info("Stored key pairs in cache: " + i);
+                        }
+                    }
+                    finally {
+                        l2.await();
+
+                        info("Before unlock keys in thread 1: " + keys);
+
+                        cache1.lockAll(keys).unlock();
+
+                        info("Unlocked entry for keys.");
+                    }
+
+                    assert !locked(keys, 1);
+
+                    return null;
+                }
+            }, 1, "TEST-THREAD-1");
+
+        IgniteFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Nullable @Override public Object call() throws Exception {
+                    info("Waiting for latch1...");
+
+                    try {
+                        l1.await();
+
+                        // This call should not acquire the lock since
+                        // other thread is holding it.
+                        assert !cache1.lockAll(keys).tryLock();
+
+                        info("Before unlock keys in thread 2: " + keys);
+
+                        cache1.lockAll(keys).unlock();
+
+                        // The keys should still be locked.
+                        for (Integer key : keys)
+                            assert cache1.isLocked(key);
+                    }
+                    finally {
+                        l2.countDown();
+                    }
+
+                    return null;
+                }
+            }, 1, "TEST-THREAD-2");
+
+        fut1.get();
+        fut2.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java
new file mode 100644
index 0000000..42d4e8a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+/**
+ * Tests cache puts in mixed mode.
+ */
+public class GridCacheMixedModeSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        return cfg;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setCacheMode(GridCacheMode.PARTITIONED);
+
+        if (F.eq(gridName, getTestGridName(0)))
+            cfg.setDistributionMode(GridCacheDistributionMode.NEAR_ONLY);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBasicOps() throws Exception {
+        GridCache<Object, Object> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, i);
+
+        for (int i = 0; i < 1000; i++)
+            assertEquals(i, cache.get(i));
+
+        for (int i = 0; i < 1000; i++)
+            assertEquals(i, cache.remove(i));
+
+        for (int i = 0; i < 1000; i++)
+            assertNull(cache.get(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheModuloAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheModuloAffinityFunction.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheModuloAffinityFunction.java
new file mode 100644
index 0000000..1cad96a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheModuloAffinityFunction.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Affinity which controls where nodes end up using mod operation.
+ */
+public class GridCacheModuloAffinityFunction implements GridCacheAffinityFunction {
+    /** Node attribute for index. */
+    public static final String IDX_ATTR = "nodeIndex";
+
+    /** Number of backups. */
+    private int backups = -1;
+
+    /** Number of partitions. */
+    private int parts = -1;
+
+    /**
+     * Empty constructor.
+     */
+    public GridCacheModuloAffinityFunction() {
+        // No-op.
+    }
+
+    /**
+     * @param parts Number of partitions.
+     * @param backups Number of backups.
+     */
+    public GridCacheModuloAffinityFunction(int parts, int backups) {
+        assert parts > 0;
+        assert backups >= 0;
+
+        this.parts = parts;
+        this.backups = backups;
+    }
+
+    /**
+     * @param parts Number of partitions.
+     */
+    public void partitions(int parts) {
+        assert parts > 0;
+
+        this.parts = parts;
+    }
+
+    /**
+     * @param backups Number of backups.
+     */
+    public void backups(int backups) {
+        assert backups >= 0;
+
+        this.backups = backups;
+    }
+
+    /**
+     * @return Number of backups.
+     */
+    public int backups() {
+        return backups;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext ctx) {
+        List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+        Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
+
+        for (int part = 0; part < parts; part++) {
+            res.add(F.isEmpty(topSnapshot) ?
+                Collections.<ClusterNode>emptyList() :
+                // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection
+                // doesn't provide equals and hashCode implementations.
+                U.sealList(nodes(part, topSnapshot)));
+        }
+
+        return Collections.unmodifiableList(res);
+    }
+
+    /** {@inheritDoc} */
+    public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes) {
+        List<ClusterNode> sorted = new ArrayList<>(nodes);
+
+        Collections.sort(sorted, new Comparator<ClusterNode>() {
+            @Override public int compare(ClusterNode n1, ClusterNode n2) {
+                int idx1 = n1.<Integer>attribute(IDX_ATTR);
+                int idx2 = n2.<Integer>attribute(IDX_ATTR);
+
+                return idx1 < idx2 ? -1 : idx1 == idx2 ? 0 : 1;
+            }
+        });
+
+        int max = 1 + backups;
+
+        if (max > nodes.size())
+            max = nodes.size();
+
+        Collection<ClusterNode> ret = new ArrayList<>(max);
+
+        Iterator<ClusterNode> it = sorted.iterator();
+
+        for (int i = 0; i < max; i++) {
+            ClusterNode n = null;
+
+            if (i == 0) {
+                while (it.hasNext()) {
+                    n = it.next();
+
+                    int nodeIdx = n.<Integer>attribute(IDX_ATTR);
+
+                    if (part <= nodeIdx)
+                        break;
+                    else
+                        n = null;
+                }
+            }
+            else {
+                if (it.hasNext())
+                    n = it.next();
+                else {
+                    it = sorted.iterator();
+
+                    assert it.hasNext();
+
+                    n = it.next();
+                }
+            }
+
+            assert n != null || nodes.size() < parts;
+
+            if (n == null)
+                n = (it = sorted.iterator()).next();
+
+
+            ret.add(n);
+        }
+
+        return ret;
+    }
+
+    /**
+     * @param parts Number of partitions.
+     * @param backups Number of backups.
+     */
+    public void reset(int parts, int backups) {
+        this.parts = parts;
+        this.backups = backups;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        parts = -1;
+        backups = -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return parts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key) {
+        if (key instanceof Number)
+            return ((Number)key).intValue() % parts;
+
+        return key == null ? 0 : U.safeAbs(key.hashCode() % parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNode(UUID nodeId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheModuloAffinityFunction.class, this);
+    }
+}


Mime
View raw message