ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [42/50] [abbrv] incubator-ignite git commit: IGNITE-112 Moving test on new query API
Date Thu, 29 Jan 2015 10:43:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
new file mode 100644
index 0000000..8763dd1
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
@@ -0,0 +1,748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.swapspace.file.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.processors.query.h2.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Multi-threaded tests for cache queries.
+ */
+@SuppressWarnings("StatementWithEmptyBody")
+public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final boolean TEST_INFO = true;
+
+    /** Number of test grids (nodes). Should not be less than 2. */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static AtomicInteger idxSwapCnt = new AtomicInteger();
+
+    /** */
+    private static AtomicInteger idxUnswapCnt = new AtomicInteger();
+
+    /** */
+    private static final long DURATION = 30 * 1000;
+
+    /** Don't start grid by default. */
+    public IgniteCacheQueryMultiThreadedSelfTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(true);
+        cacheCfg.setBackups(1);
+        cacheCfg.setEvictionPolicy(evictsEnabled() ? new CacheLruEvictionPolicy(100) : null);
+
+        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
+
+        qcfg.setIndexPrimitiveKey(true);
+
+        cacheCfg.setQueryConfiguration(qcfg);
+
+        if (offheapEnabled() && evictsEnabled())
+            cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions.
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        GridQueryConfiguration indexing = new GridQueryConfiguration();
+
+        indexing.setMaxOffheapRowsCacheSize(128);
+
+        if (offheapEnabled())
+            indexing.setMaxOffHeapMemory(0);
+
+        cfg.setQueryConfiguration(indexing);
+
+        GridQueryProcessor.idxCls = FakeIndexing.class;
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class FakeIndexing extends IgniteH2Indexing {
+        @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException {
+            super.onSwap(spaceName, key);
+
+            idxSwapCnt.incrementAndGet();
+        }
+
+        @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
+        throws IgniteCheckedException {
+            super.onUnswap(spaceName, key, val, valBytes);
+
+            idxUnswapCnt.incrementAndGet();
+        }
+    }
+
+    /** @return {@code true} If offheap enabled. */
+    protected boolean offheapEnabled() {
+        return false;
+    }
+
+    /** @return {@code true} If evictions enabled. */
+    protected boolean evictsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Clean up all caches.
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<Object, Object> c = grid(i).cache(null);
+
+            assertEquals(0, c.size());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2.";
+
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        if (evictsEnabled()) {
+            assertTrue(idxSwapCnt.get() > 0);
+            assertTrue(idxUnswapCnt.get() > 0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        // Clean up all caches.
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<Object, Object> c = grid(i).cache(null);
+
+            c.removeAll(F.<CacheEntry<Object, Object>>alwaysTrue());
+
+            Iterator<Map.Entry<Object, Object>> it = c.swapIterator();
+
+            while (it.hasNext()) {
+                it.next();
+
+                it.remove();
+            }
+
+            it = c.offHeapIterator();
+
+            while (it.hasNext()) {
+                it.next();
+
+                it.remove();
+            }
+
+            assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys());
+            assertEquals(0, c.offHeapEntriesCount());
+            assertEquals(0, c.size());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void info(String msg) {
+        if (TEST_INFO)
+            super.info(msg);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param g Grid.
+     * @return Affinity nodes.
+     */
+    private Set<UUID> affinityNodes(Iterable<Cache.Entry<Integer, Integer>> entries, Ignite g) {
+        Set<UUID> nodes = new HashSet<>();
+
+        for (Cache.Entry<Integer, Integer> entry : entries)
+            nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id());
+
+        return nodes;
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapString() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, String> c = g.jcache(null);
+        final IgniteCache<Integer, Long> cl = g.jcache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size());
+        assertEquals(0, cl.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.put(i, String.valueOf(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.put(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.localEvict(Arrays.asList(rnd.nextInt(keyCnt)));
+
+                            break;
+                        case 2:
+                            c.remove(rnd.nextInt(keyCnt));
+
+                            break;
+                        case 3:
+                            c.get(rnd.nextInt(keyCnt));
+
+                            break;
+                        case 4:
+                            int from = rnd.nextInt(valCnt);
+
+                            QueryCursor<Cache.Entry<Integer, String>> qry = c.query(
+                                new QuerySqlPredicate<Integer, String>("_val between ? and ?", String.valueOf(from),
+                                String.valueOf(from + 250)));
+
+                            Collection<Cache.Entry<Integer, String>> res = qry.getAll();
+
+                            for (Cache.Entry<Integer, String> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapLong() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, Long> c = g.jcache(null);
+        final IgniteCache<Integer, String> c1 = g.jcache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c1.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size());
+        assertEquals(0, c.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.put(i, (long)rnd.nextInt(valCnt));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.put(key, (long)rnd.nextInt(valCnt));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.localEvict(Arrays.asList(key));
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            int from = rnd.nextInt(valCnt);
+
+                            Collection<Cache.Entry<Integer, Long>> res = c.query(new QuerySqlPredicate<Integer, Long>(
+                                "_val between ? and ?", from, from + 250)).getAll();
+
+                            for (Cache.Entry<Integer, Long> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapLongString() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, Object> c = g.jcache(null);
+
+        assertEquals(0, g.jcache(null).size());
+        assertEquals(0, c.query(new QuerySqlPredicate<Integer, Object>("1 = 1")).getAll().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.put(i, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.put(key, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) :
+                                String.valueOf(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.localEvict(Arrays.asList(key));
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            int from = rnd.nextInt(valCnt);
+
+                            Collection<Cache.Entry<Integer, Object>> res = c.query(
+                                new QuerySqlPredicate<Integer, Object>("_val between ? and ?", from, from + 250))
+                                .getAll();
+
+                            for (Cache.Entry<Integer, Object> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapObject() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 4000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, TestValue> c = g.jcache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.query(new QuerySqlPredicate<Integer, TestValue>("1 = 1")).getAll().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.put(i, new TestValue(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.put(key, new TestValue(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.localEvict(Arrays.asList(key));
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            int from = rnd.nextInt(valCnt);
+
+                            Collection<Cache.Entry<Integer, TestValue>> res =
+                                c.query(new QuerySqlPredicate<Integer, TestValue>("TestValue.val between ? and ?",
+                                    from, from + 250)).getAll();
+
+                            for (Cache.Entry<Integer, TestValue> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSameQuery() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 10;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+        for (int i = 0; i < keyCnt; i++) {
+            c.put(i, i);
+
+            c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(
+            new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    int iter = 0;
+
+                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                        iter++;
+
+                        Collection<Cache.Entry<Integer, Integer>> entries =
+                            c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll();
+
+                        assert entries != null;
+
+                        assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) +
+                            ", iteration=" + iter + ']', keyCnt, entries.size());
+
+                        if (cnt.incrementAndGet() % logMod == 0) {
+                            GridCacheQueryManager<Object, Object> qryMgr =
+                                ((GridKernal)g).internalCache().context().queries();
+
+                            assert qryMgr != null;
+
+                            qryMgr.printMemoryStats();
+                        }
+                    }
+                }
+            }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        info("Finishing test...");
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedNewQueries() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 10;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+        for (int i = 0; i < keyCnt; i++) {
+            c.put(i, i);
+
+            c.localEvict(Arrays.asList(i));
+        }
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                int iter = 0;
+
+                while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                    iter++;
+
+                    Collection<Cache.Entry<Integer, Integer>> entries =
+                        c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll();
+
+                    assert entries != null;
+
+                    assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+                    if (cnt.incrementAndGet() % logMod == 0) {
+                        GridCacheQueryManager<Object, Object> qryMgr =
+                            ((GridKernal)g).internalCache().context().queries();
+
+                        assert qryMgr != null;
+
+                        qryMgr.printMemoryStats();
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedScanQuery() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 500;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+        for (int i = 0; i < keyCnt; i++)
+            c.put(i, i);
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(
+            new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    int iter = 0;
+
+                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                        iter++;
+
+                        // Scan query.
+                        Collection<Cache.Entry<Integer, Integer>> entries =
+                            c.query(new QueryPredicate<Integer, Integer>() {
+                                @Override public boolean apply(Cache.Entry<Integer, Integer> integerIntegerEntry) {
+                                    return true;
+                                }
+                            }).getAll();
+
+                        assert entries != null;
+
+                        assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+                        if (cnt.incrementAndGet() % logMod == 0) {
+                            GridCacheQueryManager<Object, Object> qryMgr =
+                                ((GridKernal)g).internalCache().context().queries();
+
+                            assert qryMgr != null;
+
+                            qryMgr.printMemoryStats();
+                        }
+                    }
+                }
+            }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * Test value.
+     */
+    private static class TestValue implements Serializable {
+        /** Value. */
+        @CacheQuerySqlField
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestValue(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java
new file mode 100644
index 0000000..dc25af5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Multi-threaded tests for cache queries.
+ */
+public class IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest extends IgniteCacheQueryOffheapMultiThreadedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean evictsEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java
new file mode 100644
index 0000000..cb4beaf
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Queries over off-heap indexes.
+ */
+public class IgniteCacheQueryOffheapMultiThreadedSelfTest extends IgniteCacheQueryMultiThreadedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean offheapEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
new file mode 100644
index 0000000..eacc4f3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setDistributionMode(PARTITIONED_ONLY);
+        ccfg.setQueryIndexEnabled(true);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        c.setCacheConfiguration(ccfg);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQuery() throws Exception {
+        final IgniteCache<Integer, Person> cache = grid(0).jcache(null);
+
+        for (int i = 0; i < 2000; i++)
+            cache.put(i, new Person(i));
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                for (int i = 0; i < 100; i++) {
+                    Iterator<Cache.Entry<Integer, Person>> iter =
+                        cache.query(new QuerySqlPredicate<Integer, Person>("Person", "age >= 0", 50, new Object[0]))
+                        .iterator();
+
+                    int cnt = 0;
+
+                    while (iter.next() != null)
+                        cnt++;
+
+                    assertEquals(2000, cnt);
+                }
+
+                return null;
+            }
+        }, 16, "test");
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @CacheQuerySqlField
+        private int age;
+
+        /**
+         * @param age Age.
+         */
+        Person(int age) {
+            this.age = age;
+        }
+
+        /**
+         * @return Age/
+         */
+        public int age() {
+            return age;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java
deleted file mode 100644
index be19ec2..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.cache.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-
-/**
- * Tests for atomic cache with near cache enabled.
- */
-public class GridCacheAtomicNearEnabledQuerySelfTest extends GridCachePartitionedQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheDistributionMode distributionMode() {
-        return NEAR_PARTITIONED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java
deleted file mode 100644
index ad0a7db..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.cache.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-
-/**
- * Tests for partitioned cache queries.
- */
-public class GridCacheAtomicQuerySelfTest extends GridCachePartitionedQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheDistributionMode distributionMode() {
-        return PARTITIONED_ONLY;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java
deleted file mode 100644
index d19c4ba..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Tests for partitioned cache queries.
- */
-public class GridCachePartitionedQueryP2PDisabledSelfTest extends GridCachePartitionedQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        c.setPeerClassLoadingEnabled(false);
-
-        return c;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java
deleted file mode 100644
index 01debfc..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Tests for partitioned cache queries.
- */
-public class GridCachePartitionedQuerySelfTest extends GridCacheAbstractQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return PARTITIONED;
-    }
-
-    /**
-     * JUnit.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSingleNodeQuery() throws Exception {
-        Person p1 = new Person("Jon", 1500);
-        Person p2 = new Person("Jane", 2000);
-        Person p3 = new Person("Mike", 1800);
-        Person p4 = new Person("Bob", 1900);
-
-        GridCache<UUID, Person> cache0 = grid(0).cache(null);
-
-        cache0.put(p1.id(), p1);
-        cache0.put(p2.id(), p2);
-        cache0.put(p3.id(), p3);
-        cache0.put(p4.id(), p4);
-
-        assertEquals(4, cache0.size());
-
-        CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class,
-            "salary < 2000").projection(grid(0).forLocal());
-
-        // Include backup entries.
-        qry.includeBackups(true);
-
-        // In order to get accumulated result from all queried nodes.
-        qry.keepAll(true);
-
-        Collection<Map.Entry<UUID, Person>> entries = qry.execute().get();
-
-        assert entries != null;
-
-        info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value()));
-
-        assertEquals(3, entries.size());
-
-        checkResult(entries, p1, p3, p4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFieldsQuery() throws Exception {
-        Person p1 = new Person("Jon", 1500);
-        Person p2 = new Person("Jane", 2000);
-        Person p3 = new Person("Mike", 1800);
-        Person p4 = new Person("Bob", 1900);
-
-        Ignite ignite0 = grid(0);
-
-        GridCache<UUID, Person> cache0 = ignite0.cache(null);
-
-        cache0.put(p1.id(), p1);
-        cache0.put(p2.id(), p2);
-        cache0.put(p3.id(), p3);
-        cache0.put(p4.id(), p4);
-
-        assertEquals(4, cache0.size());
-
-        // Fields query
-        CacheQuery<List<?>> qry = cache0.queries().createSqlFieldsQuery("select name from Person where salary > ?").
-            projection(ignite0.cluster());
-
-        Collection<List<?>> res = qry.execute(1600).get();
-
-        assertEquals(3, res.size());
-
-        // Fields query count(*)
-        qry = cache0.queries().createSqlFieldsQuery("select count(*) from Person").projection(ignite0.cluster());
-
-        res = qry.execute().get();
-
-        int cnt = 0;
-
-        for (List<?> row : res)
-            cnt += (Long)row.get(0);
-
-        assertEquals(4, cnt);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultipleNodesQuery() throws Exception {
-        Person p1 = new Person("Jon", 1500);
-        Person p2 = new Person("Jane", 2000);
-        Person p3 = new Person("Mike", 1800);
-        Person p4 = new Person("Bob", 1900);
-
-        GridCache<UUID, Person> cache0 = grid(0).cache(null);
-
-        cache0.put(p1.id(), p1);
-        cache0.put(p2.id(), p2);
-        cache0.put(p3.id(), p3);
-        cache0.put(p4.id(), p4);
-
-        assertEquals(4, cache0.size());
-
-        assert grid(0).nodes().size() == gridCount();
-
-        CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class,
-            "salary < 2000");
-
-        // Include backup entries and disable de-duplication.
-        qry.includeBackups(true);
-        qry.enableDedup(false);
-
-        // In order to get accumulated result from all queried nodes.
-        qry.keepAll(true);
-
-        // Execute on full projection, duplicates are expected.
-        Collection<Map.Entry<UUID, Person>> entries = qry.execute().get();
-
-        assert entries != null;
-
-        info("Queried entries: " + entries);
-
-        info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value()));
-
-        // Expect result including backup persons.
-        assertEquals(3 * gridCount(), entries.size());
-
-        checkResult(entries, p1, p3, p4);
-
-        // Now do the same filtering but using projection.
-        qry = cache0.projection(F.<UUID, Person>cachePrimary()).queries().createSqlQuery(Person.class,
-            "salary < 2000");
-
-        qry.keepAll(true);
-
-        entries = qry.execute().get();
-
-        assert entries != null;
-
-        info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value()));
-
-        // Expect result including backup persons.
-        assertEquals(3, entries.size());
-
-        checkResult(entries, p1, p3, p4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testIncludeBackupsAndEnableDedup() throws Exception {
-        Person p1 = new Person("Jon", 1500);
-        Person p2 = new Person("Jane", 2000);
-        Person p3 = new Person("Mike", 1800);
-        Person p4 = new Person("Bob", 1900);
-
-        GridCache<UUID, Person> cache0 = grid(0).cache(null);
-
-        cache0.put(p1.id(), p1);
-        cache0.put(p2.id(), p2);
-        cache0.put(p3.id(), p3);
-        cache0.put(p4.id(), p4);
-
-        // Retry several times.
-        for (int i = 0; i < 10; i++) {
-            CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class,
-                "salary < 2000");
-
-            // Include backup entries and disable de-duplication.
-            qry.includeBackups(true);
-            qry.enableDedup(false);
-
-            Collection<Map.Entry<UUID, Person>> entries = qry.execute().get();
-
-            info("Entries: " + entries);
-
-            assertEquals(gridCount() * 3, entries.size());
-
-            // Recreate query since we cannot use the old one.
-            qry = cache0.queries().createSqlQuery(Person.class, "salary < 2000");
-
-            // Exclude backup entries and enable de-duplication.
-            qry.includeBackups(false);
-            qry.enableDedup(true);
-
-            entries = qry.execute().get();
-
-            assertEquals(3, entries.size());
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("FloatingPointEquality")
-    public void testScanReduceQuery() throws Exception {
-        GridCache<UUID, Person> c = ignite.cache(null);
-
-        Person p1 = new Person("Bob White", 1000);
-        Person p2 = new Person("Tom White", 2000);
-        Person p3 = new Person("Mike Green", 20000);
-
-        c.put(p1.id(), p1);
-        c.put(p2.id(), p2);
-        c.put(p3.id(), p3);
-
-        CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createScanQuery(new P2<UUID, Person>() {
-            @Override public boolean apply(UUID k, Person p) {
-                return p.salary() < 20000;
-            }
-        });
-
-        R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() {
-            private double sum;
-
-            private int cnt;
-
-            @Override public boolean collect(IgnitePair<Integer> p) {
-                sum += p.get1();
-                cnt += p.get2();
-
-                return true;
-            }
-
-            @Override public Double reduce() {
-                return sum / cnt;
-            }
-        };
-
-        Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() {
-            private int sum;
-
-            private int cnt;
-
-            @Override public boolean collect(Map.Entry<UUID, Person> e) {
-                sum += e.getValue().salary();
-                cnt++;
-
-                return true;
-            }
-
-            @Override public IgnitePair<Integer> reduce() {
-                return new IgnitePair<>(sum, cnt);
-            }
-        }).get();
-
-        assertEquals(1500., F.reduce(res, locRdc));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("FloatingPointEquality")
-    public void testSqlReduceQuery() throws Exception {
-        GridCache<UUID, Person> c = ignite.cache(null);
-
-        Person p1 = new Person("Bob White", 1000);
-        Person p2 = new Person("Tom White", 2000);
-        Person p3 = new Person("Mike Green", 20000);
-
-        c.put(p1.id(), p1);
-        c.put(p2.id(), p2);
-        c.put(p3.id(), p3);
-
-        CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createSqlQuery(Person.class, "salary < 20000");
-
-        R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() {
-            private double sum;
-
-            private int cnt;
-
-            @Override public boolean collect(IgnitePair<Integer> p) {
-                sum += p.get1();
-                cnt += p.get2();
-
-                return true;
-            }
-
-            @Override public Double reduce() {
-                return sum / cnt;
-            }
-        };
-
-        Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() {
-            private int sum;
-
-            private int cnt;
-
-            @Override public boolean collect(Map.Entry<UUID, Person> e) {
-                sum += e.getValue().salary();
-                cnt++;
-
-                return true;
-            }
-
-            @Override public IgnitePair<Integer> reduce() {
-                return new IgnitePair<>(sum, cnt);
-            }
-        }).get();
-
-        assert F.reduce(res, locRdc) == 1500;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("FloatingPointEquality")
-    public void testLuceneReduceQuery() throws Exception {
-        GridCache<UUID, Person> c = ignite.cache(null);
-
-        Person p1 = new Person("Bob White", 1000);
-        Person p2 = new Person("Tom White", 2000);
-        Person p3 = new Person("Mike Green", 20000);
-
-        c.put(p1.id(), p1);
-        c.put(p2.id(), p2);
-        c.put(p3.id(), p3);
-
-        CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createFullTextQuery(Person.class, "White");
-
-        R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() {
-            private double sum;
-
-            private int cnt;
-
-            @Override public boolean collect(IgnitePair<Integer> p) {
-                sum += p.get1();
-                cnt += p.get2();
-
-                return true;
-            }
-
-            @Override public Double reduce() {
-                return sum / cnt;
-            }
-        };
-
-        Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() {
-            private int sum;
-
-            private int cnt;
-
-            @Override public boolean collect(Map.Entry<UUID, Person> e) {
-                sum += e.getValue().salary();
-                cnt++;
-
-                return true;
-            }
-
-            @Override public IgnitePair<Integer> reduce() {
-                return new IgnitePair<>(sum, cnt);
-            }
-        }).get();
-
-        assert F.reduce(res, locRdc) == 1500;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPaginationGet0() throws Exception {
-        int key = 0;
-
-        for (int i = 0; i < gridCount(); i++) {
-            int cnt = 0;
-
-            while (true) {
-                if (grid(i).cache(null).affinity().mapKeyToNode(key).equals(grid(i).localNode())) {
-                    assertTrue(grid(i).cache(null).putx(key, key));
-
-                    cnt++;
-                }
-
-                key++;
-
-                if (cnt == (i == 1 ? 2 : 3))
-                    break;
-            }
-        }
-
-        for (int i = 0; i < gridCount(); i++)
-            assertEquals(i == 1 ? 2 : 3, grid(i).cache(null).primarySize());
-
-        GridCache<Integer, Integer> cache = ignite.cache(null);
-
-        CacheQuery<Map.Entry<Integer, Integer>> q = cache.queries().createSqlQuery(Integer.class, "_key >= 0");
-
-        q.pageSize(2);
-        q.includeBackups(false);
-        q.enableDedup(true);
-
-        Collection<Map.Entry<Integer, Integer>> res = q.execute().get();
-
-        assertEquals(gridCount() * 3 - 1, res.size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReduceWithPagination() throws Exception {
-        GridCache<Integer, Integer> c = grid(0).cache(null);
-
-        for (int i = 0; i < 50; i++)
-            assertTrue(c.putx(i, 10));
-
-        CacheQuery<Map.Entry<Integer, Integer>> q = c.queries().createSqlQuery(Integer.class, "_key >= 0");
-
-        q.pageSize(10);
-
-        int res = F.sumInt(q.execute(new R1<Map.Entry<Integer, Integer>, Integer>() {
-            private int sum;
-
-            @Override public boolean collect(@Nullable Map.Entry<Integer, Integer> e) {
-                sum += e.getValue();
-
-                return true;
-            }
-
-            @Override public Integer reduce() {
-                return sum;
-            }
-        }).get());
-
-        assertEquals(500, res);
-    }
-
-    /**
-     * @param entries Queried result.
-     * @param persons Persons that should be in the result.
-     */
-    private void checkResult(Iterable<Map.Entry<UUID, Person>> entries, Person... persons) {
-        for (Map.Entry<UUID, Person> entry : entries) {
-            assertEquals(entry.getKey(), entry.getValue().id());
-
-            assert F.<Person>asList(persons).contains(entry.getValue());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java
deleted file mode 100644
index 33a5001..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Test for distributed queries with node restarts.
- */
-public class GridCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest {
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final int KEY_CNT = 1000;
-
-    /** */
-    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return GRID_CNT;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 90 * 1000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(ipFinder);
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(PARTITIONED);
-        cc.setBackups(1);
-        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        cc.setAtomicityMode(TRANSACTIONAL);
-        cc.setDistributionMode(NEAR_PARTITIONED);
-
-        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
-
-        qcfg.setIndexPrimitiveKey(true);
-
-        cc.setQueryConfiguration(qcfg);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-
-    /**
-     * JUnit.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings({"TooBroadScope"})
-    public void testRestarts() throws Exception {
-        int duration = 60 * 1000;
-        int qryThreadNum = 10;
-        final long nodeLifeTime = 2 * 1000;
-        final int logFreq = 20;
-
-        final GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        assert cache != null;
-
-        for (int i = 0; i < KEY_CNT; i++)
-            cache.put(i, i);
-
-        assertEquals(KEY_CNT, cache.size());
-
-        final AtomicInteger qryCnt = new AtomicInteger();
-
-        final AtomicBoolean done = new AtomicBoolean();
-
-        IgniteFuture<?> fut1 = multithreadedAsync(new CAX() {
-            @Override public void applyx() throws IgniteCheckedException {
-                while (!done.get()) {
-                    CacheQuery<Map.Entry<Integer, Integer>> qry =
-                        cache.queries().createSqlQuery(Integer.class, "_val >= 0");
-
-                    qry.includeBackups(true);
-                    qry.keepAll(true);
-
-                    assertFalse(qry.execute().get().isEmpty());
-
-                    int c = qryCnt.incrementAndGet();
-
-                    if (c % logFreq == 0)
-                        info("Executed queries: " + c);
-                }
-            }
-        }, qryThreadNum);
-
-        final AtomicInteger restartCnt = new AtomicInteger();
-
-        CollectingEventListener lsnr = new CollectingEventListener();
-
-        for (int i = 0; i < GRID_CNT; i++)
-            grid(i).events().localListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
-
-        IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
-            @SuppressWarnings({"BusyWait"})
-            @Override public Object call() throws Exception {
-                while (!done.get()) {
-                    int idx = GRID_CNT;
-
-                    startGrid(idx);
-
-                    Thread.sleep(nodeLifeTime);
-
-                    stopGrid(idx);
-
-                    int c = restartCnt.incrementAndGet();
-
-                    if (c % logFreq == 0)
-                        info("Node restarts: " + c);
-                }
-
-                return true;
-            }
-        }, 1);
-
-        Thread.sleep(duration);
-
-        done.set(true);
-
-        fut1.get();
-        fut2.get();
-
-        info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']');
-
-        boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);
-
-        for (int i = 0; i < GRID_CNT; i++)
-            grid(i).events().stopLocalListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
-
-        assert success;
-    }
-
-    /** Listener that will wait for specified number of events received. */
-    private class CollectingEventListener implements IgnitePredicate<IgniteEvent> {
-        /** Registered events count. */
-        private int evtCnt;
-
-        /** {@inheritDoc} */
-        @Override public synchronized boolean apply(IgniteEvent evt) {
-            evtCnt++;
-
-            info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']');
-
-            notifyAll();
-
-            return true;
-        }
-
-        /**
-         * Waits until total number of events processed is equal or greater then argument passed.
-         *
-         * @param cnt Number of events to wait.
-         * @param timeout Timeout to wait.
-         * @return {@code True} if successfully waited, {@code false} if timeout happened.
-         * @throws InterruptedException If thread is interrupted.
-         */
-        public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException {
-            long start = U.currentTimeMillis();
-
-            long now = start;
-
-            while (start + timeout > now) {
-                if (evtCnt >= cnt)
-                    return true;
-
-                wait(start + timeout - now);
-
-                now = U.currentTimeMillis();
-            }
-
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java
new file mode 100644
index 0000000..f0d8f77
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+
+/**
+ * Tests for atomic cache with near cache enabled.
+ */
+public class IgniteCacheAtomicNearEnabledQuerySelfTest extends IgniteCachePartitionedQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheDistributionMode distributionMode() {
+        return NEAR_PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java
new file mode 100644
index 0000000..8ff44f5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+
+/**
+ * Tests for partitioned cache queries.
+ */
+public class IgniteCacheAtomicQuerySelfTest extends IgniteCachePartitionedQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java
new file mode 100644
index 0000000..4779a98
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Tests for partitioned cache queries.
+ */
+public class IgniteCachePartitionedQueryP2PDisabledSelfTest extends IgniteCachePartitionedQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setPeerClassLoadingEnabled(false);
+
+        return c;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
new file mode 100644
index 0000000..fa62019
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import javax.cache.Cache;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Tests for partitioned cache queries.
+ */
+public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeQuery() throws Exception {
+        Person p1 = new Person("Jon", 1500);
+        Person p2 = new Person("Jane", 2000);
+        Person p3 = new Person("Mike", 1800);
+        Person p4 = new Person("Bob", 1900);
+
+        IgniteCache<UUID, Person> cache0 = grid(0).jcache(null);
+
+        cache0.put(p1.id(), p1);
+        cache0.put(p2.id(), p2);
+        cache0.put(p3.id(), p3);
+        cache0.put(p4.id(), p4);
+
+        assertEquals(4, cache0.size());
+
+        QueryCursor<Cache.Entry<UUID, Person>> qry =
+            cache0.localQuery(new QuerySqlPredicate<UUID, Person>("salary < 2000"));
+
+        Collection<Cache.Entry<UUID, Person>> entries = qry.getAll();
+
+        assert entries != null;
+
+        assertEquals(3, entries.size());
+
+        checkResult(entries, p1, p3, p4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFieldsQuery() throws Exception {
+        Person p1 = new Person("Jon", 1500);
+        Person p2 = new Person("Jane", 2000);
+        Person p3 = new Person("Mike", 1800);
+        Person p4 = new Person("Bob", 1900);
+
+        Ignite ignite0 = grid(0);
+
+        IgniteCache<UUID, Person> cache0 = ignite0.jcache(null);
+
+        cache0.put(p1.id(), p1);
+        cache0.put(p2.id(), p2);
+        cache0.put(p3.id(), p3);
+        cache0.put(p4.id(), p4);
+
+        assertEquals(4, cache0.size());
+
+        // Fields query
+        QueryCursor<List<?>> qry = cache0
+            .queryFields(new QuerySqlPredicate<UUID, Person>("select name from Person where salary > ?", 1600));
+
+        Collection<List<?>> res = qry.getAll();
+
+        assertEquals(3, res.size());
+
+        // Fields query count(*)
+        qry = cache0.queryFields(new QuerySqlPredicate<UUID, Person>("select count(*) from Person"));
+
+        res = qry.getAll();
+
+        int cnt = 0;
+
+        for (List<?> row : res)
+            cnt += (Long)row.get(0);
+
+        assertEquals(4, cnt);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesQuery() throws Exception {
+        Person p1 = new Person("Jon", 1500);
+        Person p2 = new Person("Jane", 2000);
+        Person p3 = new Person("Mike", 1800);
+        Person p4 = new Person("Bob", 1900);
+
+        IgniteCache<UUID, Person> cache0 = grid(0).jcache(null);
+
+        cache0.put(p1.id(), p1);
+        cache0.put(p2.id(), p2);
+        cache0.put(p3.id(), p3);
+        cache0.put(p4.id(), p4);
+
+        assertEquals(4, cache0.size());
+
+        assert grid(0).nodes().size() == gridCount();
+
+        QueryCursor<Cache.Entry<UUID, Person>> qry =
+            cache0.query(new QuerySqlPredicate<UUID, Person>("salary < 2000"));
+
+        // Execute on full projection, duplicates are expected.
+        Collection<Cache.Entry<UUID, Person>> entries = qry.getAll();
+
+        assert entries != null;
+
+        info("Queried entries: " + entries);
+
+        // Expect result including backup persons.
+        assertEquals(3 * gridCount(), entries.size());
+
+        checkResult(entries, p1, p3, p4);
+    }
+
+    /**
+     * @param entries Queried result.
+     * @param persons Persons that should be in the result.
+     */
+    private void checkResult(Iterable<Cache.Entry<UUID, Person>> entries, Person... persons) {
+        for (Cache.Entry<UUID, Person> entry : entries) {
+            assertEquals(entry.getKey(), entry.getValue().id());
+
+            assert F.<Person>asList(persons).contains(entry.getValue());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
new file mode 100644
index 0000000..a8da960
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import javax.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for distributed queries with node restarts.
+ */
+public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final int KEY_CNT = 1000;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 90 * 1000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setBackups(1);
+        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(NEAR_PARTITIONED);
+
+        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
+
+        qcfg.setIndexPrimitiveKey(true);
+
+        cc.setQueryConfiguration(qcfg);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testRestarts() throws Exception {
+        int duration = 60 * 1000;
+        int qryThreadNum = 10;
+        final long nodeLifeTime = 2 * 1000;
+        final int logFreq = 20;
+
+        final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+
+        assert cache != null;
+
+        for (int i = 0; i < KEY_CNT; i++)
+            cache.put(i, i);
+
+        assertEquals(KEY_CNT, cache.size());
+
+        final AtomicInteger qryCnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut1 = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                while (!done.get()) {
+                    Collection<Cache.Entry<Integer, Integer>> res =
+                        cache.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll();
+
+                    assertFalse(res.isEmpty());
+
+                    int c = qryCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Executed queries: " + c);
+                }
+            }
+        }, qryThreadNum);
+
+        final AtomicInteger restartCnt = new AtomicInteger();
+
+        CollectingEventListener lsnr = new CollectingEventListener();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).events().localListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+            @SuppressWarnings({"BusyWait"})
+            @Override public Object call() throws Exception {
+                while (!done.get()) {
+                    int idx = GRID_CNT;
+
+                    startGrid(idx);
+
+                    Thread.sleep(nodeLifeTime);
+
+                    stopGrid(idx);
+
+                    int c = restartCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Node restarts: " + c);
+                }
+
+                return true;
+            }
+        }, 1);
+
+        Thread.sleep(duration);
+
+        done.set(true);
+
+        fut1.get();
+        fut2.get();
+
+        info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']');
+
+        boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).events().stopLocalListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
+
+        assert success;
+    }
+
+    /** Listener that will wait for specified number of events received. */
+    private class CollectingEventListener implements IgnitePredicate<IgniteEvent> {
+        /** Registered events count. */
+        private int evtCnt;
+
+        /** {@inheritDoc} */
+        @Override public synchronized boolean apply(IgniteEvent evt) {
+            evtCnt++;
+
+            info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']');
+
+            notifyAll();
+
+            return true;
+        }
+
+        /**
+         * Waits until total number of events processed is equal or greater then argument passed.
+         *
+         * @param cnt Number of events to wait.
+         * @param timeout Timeout to wait.
+         * @return {@code True} if successfully waited, {@code false} if timeout happened.
+         * @throws InterruptedException If thread is interrupted.
+         */
+        public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException {
+            long start = U.currentTimeMillis();
+
+            long now = start;
+
+            while (start + timeout > now) {
+                if (evtCnt >= cnt)
+                    return true;
+
+                wait(start + timeout - now);
+
+                now = U.currentTimeMillis();
+            }
+
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java
deleted file mode 100644
index 9b59faa..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.replicated;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Tests replicated query.
- */
-public class GridCacheReplicatedQueryP2PDisabledSelfTest extends GridCacheReplicatedQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        c.setPeerClassLoadingEnabled(false);
-
-        return c;
-    }
-}


Mime
View raw message