ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Fri, 23 Jan 2015 09:02:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java
new file mode 100644
index 0000000..db421c0
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test that entries are indexed on load/reload methods.
+ */
+public class GridCacheQueryLoadSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Puts count. */
+    private static final int PUT_CNT = 10;
+
+    /** Store map. */
+    private static final Map<Integer, ValueObject> STORE_MAP = new HashMap<>();
+
+    /** */
+    public GridCacheQueryLoadSelfTest() {
+        super(true);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore()));
+        ccfg.setReadThrough(true);
+        ccfg.setWriteThrough(true);
+        ccfg.setLoadPreviousValue(true);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        cache().removeAll();
+
+        assert cache().isEmpty();
+        assert size(ValueObject.class) == 0;
+
+        STORE_MAP.clear();
+    }
+
+    /**
+     * Number of objects of given type in index.
+     *
+     * @param cls Value type.
+     * @return Objects number.
+     * @throws IgniteCheckedException If failed.
+     */
+    private long size(Class<?> cls) throws IgniteCheckedException {
+        GridCacheQueryManager<Object, Object> qryMgr = ((GridKernal)grid()).internalCache().context().queries();
+
+        assert qryMgr != null;
+
+        return qryMgr.size(cls);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        cache.loadCache(null, 0);
+
+        assert cache.size() == PUT_CNT;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assertNotNull(res);
+        assertEquals(PUT_CNT, res.size());
+        assertEquals(PUT_CNT, size(ValueObject.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheAsync() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        cache.loadCacheAsync(null, 0).get();
+
+        assert cache.size() == PUT_CNT;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT;
+        assert size(ValueObject.class) == PUT_CNT;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheFiltered() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        cache.loadCache(new P2<Integer, ValueObject>() {
+            @Override public boolean apply(Integer key, ValueObject val) {
+                return key >= 5;
+            }
+        }, 0);
+
+        assert cache.size() == PUT_CNT - 5;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheAsyncFiltered() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        cache.loadCacheAsync(new P2<Integer, ValueObject>() {
+            @Override public boolean apply(Integer key, ValueObject val) {
+                return key >= 5;
+            }
+        }, 0).get();
+
+        assert cache.size() == PUT_CNT - 5;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReload() throws Exception {
+        STORE_MAP.put(1, new ValueObject(1));
+
+        GridCache<Integer, ValueObject> cache = cache();
+
+        ValueObject vo = cache.reload(1);
+
+        assertNotNull(vo);
+
+        assertEquals(1, vo.value());
+        assertEquals(1, cache.size());
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == 1;
+        assert size(ValueObject.class) == 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReloadAsync() throws Exception {
+        STORE_MAP.put(1, new ValueObject(1));
+
+        GridCache<Integer, ValueObject> cache = cache();
+
+        assert cache.reloadAsync(1).get().value() == 1;
+
+        assert cache.size() == 1;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == 1;
+        assert size(ValueObject.class) == 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReloadAll() throws Exception {
+        for (int i = 0; i < PUT_CNT; i++)
+            STORE_MAP.put(i, new ValueObject(i));
+
+        GridCache<Integer, ValueObject> cache = cache();
+
+        Integer[] keys = new Integer[PUT_CNT - 5];
+
+        for (int i = 0; i < PUT_CNT - 5; i++)
+            keys[i] = i + 5;
+
+        cache.reloadAll(F.asList(keys));
+
+        assert cache.size() == PUT_CNT - 5;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+
+        for (Integer key : keys)
+            cache.clear(key);
+
+        assert cache.isEmpty();
+        assertEquals(0, cache.size());
+
+        cache.reloadAll(Arrays.asList(keys));
+
+        assertEquals(PUT_CNT - 5, cache.size());
+
+        res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReloadAllAsync() throws Exception {
+        for (int i = 0; i < PUT_CNT; i++)
+            STORE_MAP.put(i, new ValueObject(i));
+
+        GridCache<Integer, ValueObject> cache = cache();
+
+        Integer[] keys = new Integer[PUT_CNT - 5];
+
+        for (int i = 0; i < PUT_CNT - 5; i++)
+            keys[i] = i + 5;
+
+        cache.reloadAllAsync(F.asList(keys)).get();
+
+        assert cache.size() == PUT_CNT - 5;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+
+        // Invalidate will remove entries.
+        for (Integer key : keys)
+            cache.clear(key);
+
+        assert cache.isEmpty();
+        assertEquals(0, cache.size());
+
+        cache.reloadAllAsync(Arrays.asList(keys)).get();
+
+        assertEquals(PUT_CNT - 5, cache.size());
+
+        res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReloadAllFiltered() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        for (int i = 0; i < PUT_CNT; i++)
+            assert cache.putx(i, new ValueObject(i));
+
+        assert cache.size() == PUT_CNT;
+
+        Integer[] keys = new Integer[PUT_CNT];
+
+        for (int i = 0; i < PUT_CNT; i++)
+            keys[i] = i;
+
+        for (Integer key : keys)
+            cache.clear(key);
+
+        assert cache.isEmpty();
+        assertEquals(0, cache.size());
+
+        cache.projection(new P1<GridCacheEntry<Integer, ValueObject>>() {
+            @Override public boolean apply(GridCacheEntry<Integer, ValueObject> e) {
+                return e.getKey() >= 5;
+            }
+        }).reloadAll(Arrays.asList(keys));
+
+        assert cache.size() == PUT_CNT - 5;
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReloadAllAsyncFiltered() throws Exception {
+        GridCache<Integer, ValueObject> cache = cache();
+
+        for (int i = 0; i < PUT_CNT; i++)
+            assert cache.putx(i, new ValueObject(i));
+
+        assert cache.size() == PUT_CNT;
+
+        Integer[] keys = new Integer[PUT_CNT];
+
+        for (int i = 0; i < PUT_CNT; i++)
+            keys[i] = i;
+
+        for (Integer key : keys)
+            cache.clear(key);
+
+        assert cache.isEmpty();
+        assertEquals(0, cache.size());
+
+        cache.projection(new P1<GridCacheEntry<Integer, ValueObject>>() {
+            @Override public boolean apply(GridCacheEntry<Integer, ValueObject> e) {
+                return e.getKey() >= 5;
+            }
+        }).reloadAllAsync(Arrays.asList(keys)).get();
+
+        assertEquals(5, cache.size());
+
+        Collection<Map.Entry<Integer, ValueObject>> res =
+            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
+
+        assert res != null;
+        assert res.size() == PUT_CNT - 5;
+        assert size(ValueObject.class) == PUT_CNT - 5;
+    }
+
+    /**
+     * Test store.
+     */
+    private static class TestStore extends CacheStoreAdapter<Integer, ValueObject> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, ValueObject> clo, @Nullable Object... args) {
+            assert clo != null;
+
+            for (int i = 0; i < PUT_CNT; i++)
+                clo.apply(i, new ValueObject(i));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ValueObject load(Integer key) {
+            assert key != null;
+
+            return STORE_MAP.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends ValueObject> e) {
+            assert e != null;
+            assert e.getKey() != null;
+            assert e.getValue() != null;
+
+            STORE_MAP.put(e.getKey(), e.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            assert key != null;
+
+            STORE_MAP.remove(key);
+        }
+    }
+
+    /**
+     * Value object class.
+     */
+    private static class ValueObject {
+        /** Value. */
+        @GridCacheQuerySqlField
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        ValueObject(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        int value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ValueObject.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java
new file mode 100644
index 0000000..3f7e6b0
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for cache query metrics.
+ */
+public class GridCacheQueryMetricsSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private static final GridCacheMode CACHE_MODE = REPLICATED;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(CACHE_MODE);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration();
+
+        qcfg.setIndexPrimitiveKey(true);
+
+        cacheCfg.setQueryConfiguration(qcfg);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testAccumulativeMetrics() throws Exception {
+        GridCache<String, Integer> cache = cache(0);
+
+        GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0")
+            .projection(grid(0));
+
+        // Execute query.
+        qry.execute().get();
+
+        GridCacheQueryMetrics m = cache.queries().metrics();
+
+        assert m != null;
+
+        info("Metrics: " + m);
+
+        assertEquals(1, m.executions());
+        assertEquals(0, m.fails());
+        assertTrue(m.averageTime() >= 0);
+        assertTrue(m.maximumTime() >= 0);
+        assertTrue(m.minimumTime() >= 0);
+
+        // Execute again with the same parameters.
+        qry.execute().get();
+
+        m = cache.queries().metrics();
+
+        assert m != null;
+
+        info("Metrics: " + m);
+
+        assertEquals(2, m.executions());
+        assertEquals(0, m.fails());
+        assertTrue(m.averageTime() >= 0);
+        assertTrue(m.maximumTime() >= 0);
+        assertTrue(m.minimumTime() >= 0);
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testSingleQueryMetrics() throws Exception {
+        GridCache<String, Integer> cache = cache(0);
+
+        GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0")
+            .projection(grid(0));
+
+        // Execute.
+        qry.execute().get();
+
+        GridCacheQueryMetrics m = qry.metrics();
+
+        info("Metrics: " + m);
+
+        assertEquals(1, m.executions());
+        assertEquals(0, m.fails());
+        assertTrue(m.averageTime() >= 0);
+        assertTrue(m.maximumTime() >= 0);
+        assertTrue(m.minimumTime() >= 0);
+
+        // Execute.
+        qry.execute().get();
+
+        m = qry.metrics();
+
+        info("Metrics: " + m);
+
+        assertEquals(2, m.executions());
+        assertEquals(0, m.fails());
+        assertTrue(m.averageTime() >= 0);
+        assertTrue(m.maximumTime() >= 0);
+        assertTrue(m.minimumTime() >= 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
new file mode 100644
index 0000000..8b4185d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.swapspace.file.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.processors.query.h2.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ * Multi-threaded tests for cache queries.
+ */
+@SuppressWarnings("StatementWithEmptyBody")
+public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final boolean TEST_INFO = true;
+
+    /** Number of test grids (nodes). Should not be less than 2. */
+    private static final int GRID_CNT = 2;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static AtomicInteger idxSwapCnt = new AtomicInteger();
+
+    /** */
+    private static AtomicInteger idxUnswapCnt = new AtomicInteger();
+
+    /** */
+    private static final long DURATION = 30 * 1000;
+
+    /** Don't start grid by default. */
+    public GridCacheQueryMultiThreadedSelfTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(GridCacheDistributionMode.NEAR_PARTITIONED);
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(true);
+        cacheCfg.setBackups(1);
+        cacheCfg.setEvictionPolicy(evictsEnabled() ? new GridCacheLruEvictionPolicy(100) : null);
+
+        GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration();
+
+        qcfg.setIndexPrimitiveKey(true);
+
+        cacheCfg.setQueryConfiguration(qcfg);
+
+        if (offheapEnabled() && evictsEnabled())
+            cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions.
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        GridQueryConfiguration indexing = new GridQueryConfiguration();
+
+        indexing.setMaxOffheapRowsCacheSize(128);
+
+        if (offheapEnabled())
+            indexing.setMaxOffHeapMemory(0);
+
+        cfg.setQueryConfiguration(indexing);
+
+        GridQueryProcessor.idxCls = FakeIndexing.class;
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class FakeIndexing extends GridH2Indexing {
+        @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException {
+            super.onSwap(spaceName, key);
+
+            idxSwapCnt.incrementAndGet();
+        }
+
+        @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
+        throws IgniteCheckedException {
+            super.onUnswap(spaceName, key, val, valBytes);
+
+            idxUnswapCnt.incrementAndGet();
+        }
+    }
+
+    /** @return {@code true} If offheap enabled. */
+    protected boolean offheapEnabled() {
+        return false;
+    }
+
+    /** @return {@code true} If evictions enabled. */
+    protected boolean evictsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Clean up all caches.
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<Object, Object> c = grid(i).cache(null);
+
+            assertEquals(0, c.size());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2.";
+
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        if (evictsEnabled()) {
+            assertTrue(idxSwapCnt.get() > 0);
+            assertTrue(idxUnswapCnt.get() > 0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        // Clean up all caches.
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<Object, Object> c = grid(i).cache(null);
+
+            c.removeAll(F.<GridCacheEntry<Object, Object>>alwaysTrue());
+
+            Iterator<Map.Entry<Object, Object>> it = c.swapIterator();
+
+            while (it.hasNext()) {
+                it.next();
+
+                it.remove();
+            }
+
+            it = c.offHeapIterator();
+
+            while (it.hasNext()) {
+                it.next();
+
+                it.remove();
+            }
+
+            assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys());
+            assertEquals(0, c.offHeapEntriesCount());
+            assertEquals(0, c.size());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void info(String msg) {
+        if (TEST_INFO)
+            super.info(msg);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param g Grid.
+     * @return Affinity nodes.
+     */
+    private Set<UUID> affinityNodes(Iterable<Map.Entry<Integer, Integer>> entries, Ignite g) {
+        Set<UUID> nodes = new HashSet<>();
+
+        for (Map.Entry<Integer, Integer> entry : entries)
+            nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id());
+
+        return nodes;
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapString() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final GridCache<Integer, String> c = g.cache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size());
+        assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.putx(i, String.valueOf(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                assertTrue(c.evict(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.putx(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.evict(rnd.nextInt(keyCnt));
+
+                            break;
+                        case 2:
+                            c.remove(rnd.nextInt(keyCnt));
+
+                            break;
+                        case 3:
+                            c.get(rnd.nextInt(keyCnt));
+
+                            break;
+                        case 4:
+                            GridCacheQuery<Map.Entry<Integer, String>> qry = c.queries().createSqlQuery(
+                                String.class, "_val between ? and ?");
+
+                            int from = rnd.nextInt(valCnt);
+
+                            GridCacheQueryFuture<Map.Entry<Integer, String>> fut =
+                                qry.execute(String.valueOf(from), String.valueOf(from + 250));
+
+                            Collection<Map.Entry<Integer, String>> res = fut.get();
+
+                            for (Map.Entry<Integer, String> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapLong() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final GridCache<Integer, Long> c = g.cache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size());
+        assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.putx(i, (long)rnd.nextInt(valCnt));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                assertTrue(c.evict(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.putx(key, (long)rnd.nextInt(valCnt));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.evict(key);
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            GridCacheQuery<Map.Entry<Integer, Long>> qry = c.queries().createSqlQuery(
+                                Long.class,
+                                "_val between ? and ?");
+
+                            int from = rnd.nextInt(valCnt);
+
+                            GridCacheQueryFuture<Map.Entry<Integer, Long>> f = qry.execute(from, from + 250);
+
+                            Collection<Map.Entry<Integer, Long>> res = f.get();
+
+                            for (Map.Entry<Integer, Long> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapLongString() throws Exception {
+        int threadCnt = 150;
+        final int keyCnt = 2000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final GridCache<Integer, Object> c = g.cache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.offHeapEntriesCount());
+//        assertEquals(0, c.swapKeys());
+        assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size());
+        assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.putx(i, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                assertTrue(c.evict(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.putx(key, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) :
+                                String.valueOf(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.evict(key);
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            GridCacheQuery<Map.Entry<Integer, Object>> qry = c.queries().createSqlQuery(
+                                rnd.nextBoolean() ? Long.class : String.class,
+                                "_val between ? and ?");
+
+                            int from = rnd.nextInt(valCnt);
+
+                            GridCacheQueryFuture<Map.Entry<Integer, Object>> f = qry.execute(from, from + 250);
+
+                            Collection<Map.Entry<Integer, Object>> res = f.get();
+
+                            for (Map.Entry<Integer, Object> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSwapUnswapObject() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 4000;
+        final int valCnt = 10000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final GridCache<Integer, TestValue> c = g.cache(null);
+
+        assertEquals(0, g.cache(null).size());
+        assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size());
+        assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size());
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
+            c.putx(i, new TestValue(rnd.nextInt(valCnt)));
+
+            if (evictsEnabled() && rnd.nextBoolean())
+                assertTrue(c.evict(i));
+        }
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                Random rnd = new Random();
+
+                while (!done.get()) {
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (rnd.nextInt(5)) {
+                        case 0:
+                            c.putx(key, new TestValue(rnd.nextInt(valCnt)));
+
+                            break;
+                        case 1:
+                            if (evictsEnabled())
+                                c.evict(key);
+
+                            break;
+                        case 2:
+                            c.remove(key);
+
+                            break;
+                        case 3:
+                            c.get(key);
+
+                            break;
+                        case 4:
+                            GridCacheQuery<Map.Entry<Integer, TestValue>> qry = c.queries().createSqlQuery(
+                                Long.class, "TestValue.val between ? and ?");
+
+                            int from = rnd.nextInt(valCnt);
+
+                            GridCacheQueryFuture<Map.Entry<Integer, TestValue>> f = qry.execute(from, from + 250);
+
+                            Collection<Map.Entry<Integer, TestValue>> res = f.get();
+
+                            for (Map.Entry<Integer, TestValue> ignored : res) {
+                                //No-op.
+                            }
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSameQuery() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 10;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        GridCache<Integer, Integer> c = g.cache(null);
+
+        for (int i = 0; i < keyCnt; i++) {
+            c.putx(i, i);
+
+            info("Affinity [key=" + i + ", aff=" + c.affinity().mapKeyToPrimaryAndBackups(i).iterator().next().id() + ']');
+
+            assertTrue(c.evict(i));
+        }
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        final GridCacheQuery<Map.Entry<Integer, Integer>> qry = c.queries().createSqlQuery(Integer.class, "_val >= 0");
+
+        IgniteFuture<?> fut = multithreadedAsync(
+            new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    int iter = 0;
+
+                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                        iter++;
+
+                        GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+                        Collection<Map.Entry<Integer, Integer>> entries = fut.get();
+
+                        assert entries != null;
+
+                        assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) +
+                            ", iteration=" + iter + ']', keyCnt, entries.size());
+
+                        if (cnt.incrementAndGet() % logMod == 0) {
+                            GridCacheQueryManager<Object, Object> qryMgr =
+                                ((GridKernal)g).internalCache().context().queries();
+
+                            assert qryMgr != null;
+
+                            qryMgr.printMemoryStats();
+                        }
+                    }
+                }
+            }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        info("Finishing test...");
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedNewQueries() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 10;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final GridCache<Integer, Integer> c = g.cache(null);
+
+        for (int i = 0; i < keyCnt; i++) {
+            c.putx(i, i);
+
+            assertTrue(c.evict(i));
+        }
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                int iter = 0;
+
+                while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                    iter++;
+
+                    GridCacheQuery<Map.Entry<Integer, Integer>> qry =
+                        c.queries().createSqlQuery(Integer.class, "_val >= 0");
+
+                    GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+                    Collection<Map.Entry<Integer, Integer>> entries = fut.get();
+
+                    assert entries != null;
+
+                    assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+                    if (cnt.incrementAndGet() % logMod == 0) {
+                        GridCacheQueryManager<Object, Object> qryMgr =
+                            ((GridKernal)g).internalCache().context().queries();
+
+                        assert qryMgr != null;
+
+                        qryMgr.printMemoryStats();
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedReduceQuery() throws Exception {
+        int threadCnt = 50;
+        int keyCnt = 10;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        GridCache<Integer, Integer> c = g.cache(null);
+
+        for (int i = 0; i < keyCnt; i++)
+            c.putx(i, i);
+
+        final GridCacheQuery<Map.Entry<Integer, Integer>> rdcQry =
+            c.queries().createSqlQuery(Integer.class, "_val > 1 and _val < 4");
+
+        rdcQry.includeBackups(true);
+        rdcQry.keepAll(true);
+
+        final IgniteReducer<Map.Entry<Integer, Integer>, Integer> rmtRdc =
+            new IgniteReducer<Map.Entry<Integer, Integer>, Integer>() {
+                /** Reducer result. */
+                private int res;
+
+                @Override public boolean collect(Map.Entry<Integer, Integer> e) {
+                    res += e.getKey();
+
+                    return true;
+                }
+
+                @Override public Integer reduce() {
+                    return res;
+                }
+            };
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteFuture<?> fut = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                while (!stop.get()) {
+                    Collection<Integer> rmtVals = rdcQry.execute(rmtRdc).get();
+
+                    assertEquals(GRID_CNT, rmtVals.size());
+
+                    Iterator<Integer> reduceIter = rmtVals.iterator();
+
+                    assert reduceIter != null;
+
+                    for (int i = 0; i < GRID_CNT; i++) {
+                        assert reduceIter.hasNext();
+
+                        assertEquals(Integer.valueOf(5), reduceIter.next());
+                    }
+
+                    Collection<Integer> res = rdcQry.execute(rmtRdc).get();
+
+                    int val = F.sumInt(res);
+
+                    int expVal = 5 * GRID_CNT;
+
+                    assertEquals(expVal, val);
+
+                    if (cnt.incrementAndGet() % logMod == 0) {
+                        GridCacheQueryManager<Object, Object> qryMgr =
+                            ((GridKernal)g).internalCache().context().queries();
+
+                        assert qryMgr != null;
+
+                        qryMgr.printMemoryStats();
+                    }
+                }
+            }
+        }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        stop.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedScanQuery() throws Exception {
+        int threadCnt = 50;
+        final int keyCnt = 500;
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        GridCache<Integer, Integer> c = g.cache(null);
+
+        for (int i = 0; i < keyCnt; i++)
+            c.putx(i, i);
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        final GridCacheQuery<Map.Entry<Integer, Integer>> qry = c.queries().createScanQuery(null);
+
+        IgniteFuture<?> fut = multithreadedAsync(
+            new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    int iter = 0;
+
+                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                        iter++;
+
+                        GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+                        Collection<Map.Entry<Integer, Integer>> entries = fut.get();
+
+                        assert entries != null;
+
+                        assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+                        if (cnt.incrementAndGet() % logMod == 0) {
+                            GridCacheQueryManager<Object, Object> qryMgr =
+                                ((GridKernal)g).internalCache().context().queries();
+
+                            assert qryMgr != null;
+
+                            qryMgr.printMemoryStats();
+                        }
+                    }
+                }
+            }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * Test value.
+     */
+    private static class TestValue implements Serializable {
+        /** Value. */
+        @GridCacheQuerySqlField
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestValue(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java
new file mode 100644
index 0000000..e815ae5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Multi-threaded tests for cache queries.
+ */
+public class GridCacheQueryOffheapEvictsMultiThreadedSelfTest extends GridCacheQueryOffheapMultiThreadedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean evictsEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java
new file mode 100644
index 0000000..35bf001
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Queries over off-heap indexes.
+ */
+public class GridCacheQueryOffheapMultiThreadedSelfTest extends GridCacheQueryMultiThreadedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean offheapEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java
new file mode 100644
index 0000000..55c2fd9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import java.io.*;
+
+/**
+ * Query test value.
+ */
+@SuppressWarnings("unused")
+public class GridCacheQueryTestValue implements Serializable {
+    /** */
+    @GridCacheQueryTextField
+    @GridCacheQuerySqlField(name = "fieldname")
+    private String field1;
+
+    /** */
+    private int field2;
+
+    /** */
+    @GridCacheQuerySqlField(unique = true)
+    private long field3;
+
+    /** */
+    @GridCacheQuerySqlField(orderedGroups = {
+        @GridCacheQuerySqlField.Group(name = "grp1", order = 1),
+        @GridCacheQuerySqlField.Group(name = "grp2", order = 2)})
+    private long field4;
+
+    /** */
+    @GridCacheQuerySqlField(orderedGroups = {@GridCacheQuerySqlField.Group(name = "grp1", order = 2)})
+    private long field5;
+
+    /** */
+    @GridCacheQuerySqlField(orderedGroups = {@GridCacheQuerySqlField.Group(name = "grp1", order = 3)})
+    private GridCacheQueryEmbeddedValue field6 = new GridCacheQueryEmbeddedValue();
+
+    /**
+     *
+     * @return Field.
+     */
+    public String getField1() {
+        return field1;
+    }
+
+    /**
+     *
+     * @param field1 Field.
+     */
+    public void setField1(String field1) {
+        this.field1 = field1;
+    }
+
+    /**
+     *
+     * @return Field.
+     */
+    @GridCacheQuerySqlField
+    public int getField2() {
+        return field2;
+    }
+
+    /**
+     *
+     * @param field2 Field.
+     */
+    public void setField2(int field2) {
+        this.field2 = field2;
+    }
+
+    /**
+     *
+     * @return Field.
+     */
+    public long getField3() {
+        return field3;
+    }
+
+    /**
+     *
+     * @param field3 Field.
+     */
+    public void setField3(long field3) {
+        this.field3 = field3;
+    }
+
+    /**
+     *
+     * @return Field.
+     */
+    public long getField4() {
+        return field4;
+    }
+
+    /**
+     *
+     * @param field4 Field.
+     */
+    public void setField4(long field4) {
+        this.field4 = field4;
+    }
+
+    /**
+     * @return Field.
+     */
+    public long getField5() {
+        return field5;
+    }
+
+    /**
+     * @param field5 Field.
+     */
+    public void setField5(long field5) {
+        this.field5 = field5;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"RedundantIfStatement"})
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GridCacheQueryTestValue that = (GridCacheQueryTestValue)o;
+
+        if (field2 != that.field2)
+            return false;
+
+        if (field3 != that.field3)
+            return false;
+
+        if (field1 != null ? !field1.equals(that.field1) : that.field1 != null)
+            return false;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (field1 != null ? field1.hashCode() : 0);
+
+        res = 31 * res + field2;
+        res = 31 * res + (int)(field3 ^ (field3 >>> 32));
+
+        return res;
+    }
+
+    /**
+     * @param field6 Embedded value.
+     */
+    public void setField6(GridCacheQueryEmbeddedValue field6) {
+        this.field6 = field6;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
new file mode 100644
index 0000000..5238b4f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Multithreaded reduce query tests with lots of data.
+ */
+public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 5;
+
+    /** */
+    private static final int TEST_TIMEOUT = 2 * 60 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setBackups(1);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration();
+
+        qcfg.setIndexPrimitiveKey(true);
+
+        cfg.setQueryConfiguration(qcfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testReduceQuery() throws Exception {
+        final int keyCnt = 5000;
+        final int logFreq = 500;
+
+        final GridCache<String, Integer> c = cache();
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+
+        IgniteFuture<?> fut1 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                for (int i = 1; i < keyCnt; i++) {
+                    assertTrue(c.putx(String.valueOf(i), i));
+
+                    startLatch.countDown();
+
+                    if (i % logFreq == 0)
+                        info("Stored entries: " + i);
+                }
+
+                return null;
+            }
+        }, 1);
+
+        // Create query.
+        final GridCacheQuery<Map.Entry<String, Integer>> sumQry =
+            c.queries().createSqlQuery(Integer.class, "_val > 0").timeout(TEST_TIMEOUT);
+
+        final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new R1<Map.Entry<String, Integer>, Integer>() {
+            /** */
+            private AtomicInteger sum = new AtomicInteger();
+
+            @Override public boolean collect(Map.Entry<String, Integer> e) {
+                sum.addAndGet(e.getValue());
+
+                return true;
+            }
+
+            @Override public Integer reduce() {
+                return sum.get();
+            }
+        };
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        startLatch.await();
+
+        IgniteFuture<?> fut2 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                int cnt = 0;
+
+                while (!stop.get()) {
+                    Collection<Integer> res = sumQry.execute(rmtRdc).get();
+
+                    int sum = F.sumInt(res);
+
+                    cnt++;
+
+                    assertTrue(sum > 0);
+
+                    if (cnt % logFreq == 0) {
+                        info("Reduced value: " + sum);
+                        info("Executed queries: " + cnt);
+                    }
+                }
+
+                return null;
+            }
+        }, 1);
+
+        fut1.get();
+
+        stop.set(true);
+
+        fut2.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java
new file mode 100644
index 0000000..a1acfd5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class GridCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setDistributionMode(PARTITIONED_ONLY);
+        ccfg.setQueryIndexEnabled(true);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        c.setCacheConfiguration(ccfg);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQuery() throws Exception {
+        final GridCache<Integer, Person> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 2000; i++)
+            cache.put(i, new Person(i));
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                for (int i = 0; i < 100; i++) {
+                    GridCacheQuery<Map.Entry<Integer, Person>> qry =
+                        cache.queries().createSqlQuery("Person", "age >= 0");
+
+                    qry.includeBackups(false);
+                    qry.enableDedup(true);
+                    qry.keepAll(true);
+                    qry.pageSize(50);
+
+                    GridCacheQueryFuture<Map.Entry<Integer, Person>> fut = qry.execute();
+
+                    int cnt = 0;
+
+                    while (fut.next() != null)
+                        cnt++;
+
+                    assertEquals(2000, cnt);
+                }
+
+                return null;
+            }
+        }, 16, "test");
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @GridCacheQuerySqlField
+        private int age;
+
+        /**
+         * @param age Age.
+         */
+        Person(int age) {
+            this.age = age;
+        }
+
+        /**
+         * @return Age/
+         */
+        public int age() {
+            return age;
+        }
+    }
+}


Mime
View raw message