ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [4/9] ignite git commit: ignite-2921: ScanQueries over local partitions performance optimisation
Date Tue, 17 May 2016 02:34:20 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index abfc325..06cbe79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -1119,35 +1120,41 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object,
Object> p) {
-        if (!cache.context().affinityNode()) {
-            ClusterNode oldestSrvNode =
-                CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+        try {
+            if (!cache.context().affinityNode()) {
+                ClusterNode oldestSrvNode =
+                    CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
 
-            if (oldestSrvNode == null)
-                return new GridEmptyIterator<>();
+                if (oldestSrvNode == null)
+                    return new GridEmptyIterator<>();
 
-            GridCacheQueryManager qryMgr = cache.context().queries();
+                GridCacheQueryManager qryMgr = cache.context().queries();
 
-            CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p,
null, false);
+                CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p,
null, false);
 
-            qry.keepAll(false);
+                qry.keepAll(false);
 
-            qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+                qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
 
-            return cache.context().itHolder().iterator(qry.execute(),
-                new CacheIteratorConverter<Object, Map.Entry<Object,Object>>()
{
-                    @Override protected Object convert(Map.Entry<Object, Object> e)
{
-                        return new CacheEntryImpl<>(e.getKey(), e.getValue());
-                    }
+                GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery();
 
-                    @Override protected void remove(Object item) {
-                        throw new UnsupportedOperationException();
-                    }
-                }
-            );
+                return cache.context().itHolder().iterator(iter,
+                    new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,Object>>()
{
+                        @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object,
Object> e) {
+                            return new CacheEntryImpl<>(e.getKey(), e.getValue());
+                        }
+
+                        @Override protected void remove(Cache.Entry<Object, Object>
item) {
+                            throw new UnsupportedOperationException();
+                        }
+                    });
+            }
+            else
+                return cache.entrySetx().iterator();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
-        else
-            return cache.entrySetx().iterator();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index e434b49..0d45324 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -17,19 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
-import java.util.Map;
-import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePreloadLifecycleAbstractTest;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lifecycle.LifecycleBean;
 import org.apache.ignite.lifecycle.LifecycleEventType;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -164,34 +157,6 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
     }
 
     /**
-     * @param keys Keys.
-     * @throws Exception If failed.
-     */
-    public void checkScanQuery(Object[] keys) throws Exception {
-        preloadMode = SYNC;
-
-        lifecycleBean = lifecycleBean(keys);
-
-        for (int i = 0; i < gridCnt; i++) {
-            startGrid(i);
-
-            info("Checking '" + (i + 1) + "' nodes...");
-
-            for (int j = 0; j < G.allGrids().size(); j++) {
-                GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
-
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null,
null, false);
-
-                int totalCnt = F.sumInt(qry.execute(new EntryIntegerIgniteReducer()).get());
-
-                info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']');
-
-                assertEquals(keys.length / 2, totalCnt);
-            }
-        }
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testLifecycleBean1() throws Exception {
@@ -218,69 +183,4 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
     public void testLifecycleBean4() throws Exception {
         checkCache(keys(false, 500));
     }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery1() throws Exception {
-        checkScanQuery(keys(true, DFLT_KEYS.length, DFLT_KEYS));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery2() throws Exception {
-        checkScanQuery(keys(false, DFLT_KEYS.length, DFLT_KEYS));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery3() throws Exception {
-        checkScanQuery(keys(true, 500));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery4() throws Exception {
-        checkScanQuery(keys(false, 500));
-    }
-
-    /**
-     *
-     */
-    private static class EntryIntegerIgniteReducer implements IgniteReducer<Map.Entry<Object,
MyValue>, Integer> {
-        @IgniteInstanceResource
-        private Ignite grid;
-
-        private int cnt;
-
-        @Override public boolean collect(Map.Entry<Object, MyValue> e) {
-            Object key = e.getKey();
-
-            assertNotNull(e.getValue());
-
-            try {
-                Object v1 = e.getValue();
-                Object v2 = grid.cache("one").get(key);
-
-                assertNotNull(v2);
-                assertEquals(v1, v2);
-            }
-            catch (CacheException e1) {
-                e1.printStackTrace();
-
-                assert false;
-            }
-
-            cnt++;
-
-            return true;
-        }
-
-        @Override public Integer reduce() {
-            return cnt;
-        }
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 6a7a68b..f6799d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -17,26 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.replicated.preloader;
 
-import java.util.Map;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePreloadLifecycleAbstractTest;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lifecycle.LifecycleBean;
 import org.apache.ignite.lifecycle.LifecycleEventType;
 import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
 
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
@@ -167,40 +158,6 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
         }
     }
 
-
-    /**
-     * @param keys Keys.
-     * @throws Exception If failed.
-     */
-    public void checkScanQuery(Object[] keys) throws Exception {
-        preloadMode = SYNC;
-
-        lifecycleBean = lifecycleBean(keys);
-
-        for (int i = 0; i < gridCnt; i++) {
-            startGrid(i);
-
-            info("Checking '" + (i + 1) + "' nodes...");
-
-            for (int j = 0; j < G.allGrids().size(); j++) {
-                GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
-
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null,
null, false);
-
-                final int i0 = j;
-                final int j0 = i;
-
-                qry = qry.projection(grid(j).cluster());
-
-                int totalCnt = F.sumInt(qry.execute(new EntryReducer(j0, i0)).get());
-
-                info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']');
-
-                assertEquals(keys.length * (i + 1) / 2, totalCnt);
-            }
-        }
-    }
-
     /**
      * @throws Exception If failed.
      */
@@ -228,91 +185,4 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
     public void testLifecycleBean4() throws Exception {
         checkCache(keys(false, 500));
     }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery1() throws Exception {
-        checkScanQuery(keys(true, DFLT_KEYS.length, DFLT_KEYS));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery2() throws Exception {
-        checkScanQuery(keys(false, DFLT_KEYS.length, DFLT_KEYS));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery3() throws Exception {
-        checkScanQuery(keys(true, 500));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQuery4() throws Exception {
-        checkScanQuery(keys(false, 500));
-    }
-
-    private static class EntryReducer implements IgniteReducer<Map.Entry<Object, MyValue>,
Integer> {
-        /** */
-        private final int j0;
-
-        /** */
-        private final int i0;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite grid;
-
-        /** */
-        @LoggerResource
-        private IgniteLogger log0;
-
-        /** */
-        private int cnt;
-
-        /**
-         */
-        public EntryReducer(int j0, int i0) {
-            this.j0 = j0;
-            this.i0 = i0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(Map.Entry<Object, MyValue> e) {
-            if (!quiet && log0.isInfoEnabled())
-                log0.info("Collecting entry: " + e);
-
-            Object key = e.getKey();
-
-            assertNotNull(e.getValue());
-
-            try {
-                Object v1 = e.getValue();
-                Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
-
-                assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ",
missedKey=" +
-                    key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']',
v2);
-                assertEquals(v1, v2);
-            }
-            catch (IgniteCheckedException e1) {
-                e1.printStackTrace();
-
-                assert false;
-            }
-
-            cnt++;
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer reduce() {
-            return cnt;
-        }
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java
b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java
index 5c88f4e..b20cb0e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java
@@ -197,9 +197,9 @@ public class ConfigVariationsTestSuiteBuilder {
         TestSuite addedSuite;
 
         if (testedNodeCnt > 1)
-            addedSuite = createMultiNodeTestSuite((Class<? extends IgniteCacheConfigVariationsAbstractTest>)cls,

+            addedSuite = createMultiNodeTestSuite((Class<? extends IgniteCacheConfigVariationsAbstractTest>)cls,
                 testCfg, testedNodeCnt, withClients, skipWaitPartMapExchange);
-        else
+       else
             addedSuite = new IgniteConfigVariationsTestSuite(cls, testCfg);
 
         return addedSuite;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java
index acf4a05..6b52578 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java
@@ -132,7 +132,7 @@ public abstract class IgniteCacheConfigVariationsAbstractTest extends
IgniteConf
 
             info(">>> Starting set of tests [testedNodeIdx=" + testedNodeIdx
                 + ", id=" + grid(testedNodeIdx).localNode().id()
-                + ", isClient=" + grid(testedNodeIdx).configuration().isClientMode()
+                + ", isClient=" + isClientMode()
                 + ", nearEnabled=" + testedNodeNearEnabled + "]");
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
index 3370421..d70b606 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
@@ -58,7 +58,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
     protected VariationsTestsConfig testsCfg;
 
     /** */
-    protected volatile DataMode dataMode;
+    protected volatile DataMode dataMode = DataMode.PLANE_OBJECT;
 
     /**
      * @param testsCfg Tests configuration.
@@ -198,6 +198,27 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
     }
 
     /**
+     * @return Tested grid in client mode or not.
+     */
+    protected boolean isClientMode() {
+        return grid(testedNodeIdx).configuration().isClientMode();
+    }
+
+    /**
+     * @return Count of server nodes at topology.
+     */
+    protected int serversGridCount() {
+        int cnt = 0;
+
+        for (int i = 0; i < gridCount(); i++) {
+            if (!grid(i).configuration().isClientMode())
+                cnt++;
+        }
+
+        return cnt;
+    }
+
+    /**
      * Runs in all data modes.
      */
     protected void runInAllDataModes(TestRunnable call) throws Exception {
@@ -228,6 +249,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
     /**
      * @param keyId Key Id.
      * @return Key.
+     * @see #valueOf(Object)
      */
     public Object key(int keyId) {
         return key(keyId, dataMode);
@@ -236,6 +258,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
     /**
      * @param valId Key Id.
      * @return Value.
+     * @see #valueOf(Object)
      */
     public Object value(int valId) {
         return value(valId, dataMode);
@@ -272,7 +295,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
         if (obj instanceof TestObject)
             return ((TestObject)obj).value();
         else
-            throw new IllegalStateException();
+            throw new IllegalArgumentException("Unknown tested object type: " + obj);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/modules/indexing/pom.xml b/modules/indexing/pom.xml
index 22a52b2..899d4a7 100644
--- a/modules/indexing/pom.xml
+++ b/modules/indexing/pom.xml
@@ -68,6 +68,13 @@
         </dependency>
 
         <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-core</artifactId>
             <version>${spring.version}</version>

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
index 28eef90..d2d8c4d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
@@ -84,6 +84,7 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
         cacheCfg1.setCacheMode(cacheMode);
         cacheCfg1.setWriteSynchronizationMode(FULL_SYNC);
         cacheCfg1.setIndexedTypes(String.class, Integer.class);
+        cacheCfg1.setStatisticsEnabled(true);
 
         CacheConfiguration<String, Integer> cacheCfg2 = defaultCacheConfiguration();
 
@@ -91,6 +92,7 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
         cacheCfg2.setCacheMode(cacheMode);
         cacheCfg2.setWriteSynchronizationMode(FULL_SYNC);
         cacheCfg2.setIndexedTypes(String.class, Integer.class);
+        cacheCfg2.setStatisticsEnabled(true);
 
         cfg.setCacheConfiguration(cacheCfg1, cacheCfg2);
 
@@ -347,4 +349,4 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
             }
         }, 5000);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 2b2020d..65d479d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -867,7 +867,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     public void testScanQuery() throws Exception {
         IgniteCache<Integer, String> c1 = ignite().cache(null);
 
-        c1.put(777, "value");
+        Map<Integer, String> map = new HashMap(){{
+            for (int i = 0; i < 5000; i++)
+                put(i, "str" + i);
+        }};
+
+        for (Map.Entry<Integer, String> e : map.entrySet())
+            c1.put(e.getKey(), e.getValue());
 
         // Scan query.
         QueryCursor<Cache.Entry<Integer, String>> qry = c1.query(new ScanQuery<Integer,
String>());
@@ -876,16 +882,21 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
 
         assert iter != null;
 
-        int expCnt = 1;
+        int cnt = 0;
+
+        while (iter.hasNext()) {
+            Cache.Entry<Integer, String> e = iter.next();
+
+            String expVal = map.get(e.getKey());
+
+            assertNotNull(expVal);
 
-        for (int i = 0; i < expCnt; i++) {
-            Cache.Entry<Integer, String> e1 = iter.next();
+            assertEquals(expVal, e.getValue());
 
-            assertEquals(777, e1.getKey().intValue());
-            assertEquals("value", e1.getValue());
+            cnt++;
         }
 
-        assert !iter.hasNext();
+        assertEquals(map.size(), cnt);
     }
 
     /**
@@ -1308,13 +1319,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testScanQueryEvents() throws Exception {
-        checkScanQueryEvents();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkScanQueryEvents() throws Exception {
         final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
         final CountDownLatch latch = new CountDownLatch(10);
         final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1
: gridCount());
@@ -1980,4 +1984,4 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
         /** */
         TYPE_B
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java
new file mode 100644
index 0000000..4e6af25
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.SCAN;
+
+/**
+ * Config Variations query tests.
+ */
+public class IgniteCacheConfigVariationsQueryTest extends IgniteCacheConfigVariationsAbstractTest
{
+    /** */
+    public static final int CNT = 50;
+
+    /** */
+    private Map<Object, Object> evtMap;
+
+    /** */
+    private CountDownLatch readEvtLatch;
+
+    /** */
+    private CountDownLatch execEvtLatch;
+
+    /** */
+    private IgnitePredicate[] objReadLsnrs;
+
+    /** */
+    private IgnitePredicate[] qryExecLsnrs;
+
+    /** */
+    private Map<Object, Object> expMap;
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("serial")
+    public void testScanQuery() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                try {
+                    IgniteCache<Object, Object> cache = jcache();
+
+                    Map<Object, Object> map = new HashMap<Object, Object>() {{
+                        for (int i = 0; i < CNT; i++)
+                            put(key(i), value(i));
+                    }};
+
+                    registerEventListeners(map);
+
+                    for (Map.Entry<Object, Object> e : map.entrySet())
+                        cache.put(e.getKey(), e.getValue());
+
+                    // Scan query.
+                    QueryCursor<Cache.Entry<Object, Object>> qry = cache.query(new
ScanQuery());
+
+                    checkQueryResults(map, qry);
+                }
+                finally {
+                    stopListeners();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanPartitionQuery() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                IgniteCache<Object, Object> cache = jcache();
+
+                GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+                Map<Integer, Map<Object, Object>> entries = new HashMap<>();
+
+                for (int i = 0; i < CNT; i++) {
+                    Object key = key(i);
+                    Object val = value(i);
+
+                    cache.put(key, val);
+
+                    int part = cctx.affinity().partition(key);
+
+                    Map<Object, Object> partEntries = entries.get(part);
+
+                    if (partEntries == null)
+                        entries.put(part, partEntries = new HashMap<>());
+
+                    partEntries.put(key, val);
+                }
+
+                for (int i = 0; i < cctx.affinity().partitions(); i++) {
+                    try {
+                        Map<Object, Object> exp = entries.get(i);
+
+                        if (exp == null)
+                            System.out.println();
+
+                        registerEventListeners(exp);
+
+                        ScanQuery<Object, Object> scan = new ScanQuery<>(i);
+
+                        Collection<Cache.Entry<Object, Object>> actual = cache.query(scan).getAll();
+
+                        assertEquals("Failed for partition: " + i, exp == null ? 0 : exp.size(),
actual.size());
+
+                        if (exp != null) {
+                            for (Cache.Entry<Object, Object> entry : actual)
+                                assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
+                        }
+
+                        checkEvents();
+                    }
+                    finally {
+                        stopListeners();
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SubtractionInCompareTo")
+    public void testScanFilters() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                try {
+                    IgniteCache<Object, Object> cache = jcache();
+
+                    IgniteBiPredicate<Object, Object> p = new IgniteBiPredicate<Object,
Object>() {
+                        @Override public boolean apply(Object k, Object v) {
+                            assertNotNull(k);
+                            assertNotNull(v);
+
+                            return valueOf(k) >= 20 && valueOf(v) < 40;
+                        }
+                    };
+
+                    Map<Object, Object> exp = new HashMap<>();
+
+                    for (int i = 0; i < CNT; i++) {
+                        Object key = key(i);
+                        Object val = value(i);
+
+                        cache.put(key, val);
+
+                        if (p.apply(key, val))
+                            exp.put(key, val);
+                    }
+
+                    registerEventListeners(exp, true);
+
+                    QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new
ScanQuery<>(p));
+
+                    checkQueryResults(exp, q);
+                }
+                finally {
+                    stopListeners();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SubtractionInCompareTo")
+    public void testLocalScanQuery() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                try {
+                    IgniteCache<Object, Object> cache = jcache();
+
+                    ClusterNode locNode = testedGrid().cluster().localNode();
+                    Affinity<Object> affinity = testedGrid().affinity(cacheName());
+
+                    Map<Object, Object> map = new HashMap<>();
+
+                    for (int i = 0; i < CNT; i++) {
+                        Object key = key(i);
+                        Object val = value(i);
+
+                        cache.put(key, val);
+
+                        if (!isClientMode() && (cacheMode() == REPLICATED || affinity.isPrimary(locNode,
key)))
+                            map.put(key, val);
+                    }
+
+                    registerEventListeners(map);
+
+                    QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new
ScanQuery<>().setLocal(true));
+
+                    checkQueryResults(map, q);
+                }
+                finally {
+                    stopListeners();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SubtractionInCompareTo")
+    public void testScanQueryLocalFilter() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                try {
+                    IgniteCache<Object, Object> cache = jcache();
+
+                    ClusterNode locNode = testedGrid().cluster().localNode();
+
+                    Map<Object, Object> map = new HashMap<>();
+
+                    IgniteBiPredicate<Object, Object> filter = new IgniteBiPredicate<Object,
Object>() {
+                        @Override public boolean apply(Object k, Object v) {
+                            assertNotNull(k);
+                            assertNotNull(v);
+
+                            return valueOf(k) >= 20 && valueOf(v) < 40;
+                        }
+                    };
+
+                    for (int i = 0; i < CNT; i++) {
+                        Object key = key(i);
+                        Object val = value(i);
+
+                        cache.put(key, val);
+
+                        if (!isClientMode() && (cacheMode() == REPLICATED
+                            || testedGrid().affinity(cacheName()).isPrimary(locNode, key))
&& filter.apply(key, val))
+                            map.put(key, val);
+                    }
+
+                    registerEventListeners(map, true);
+
+                    QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new
ScanQuery<>(filter).setLocal(true));
+
+                    checkQueryResults(map, q);
+                }
+                finally {
+                    stopListeners();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SubtractionInCompareTo")
+    public void testScanQueryPartitionFilter() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                IgniteCache<Object, Object> cache = jcache();
+
+                Affinity<Object> affinity = testedGrid().affinity(cacheName());
+
+                Map<Integer, Map<Object, Object>> partMap = new HashMap<>();
+
+                IgniteBiPredicate<Object, Object> filter = new IgniteBiPredicate<Object,
Object>() {
+                    @Override public boolean apply(Object k, Object v) {
+                        assertNotNull(k);
+                        assertNotNull(v);
+
+                        return valueOf(k) >= 20 && valueOf(v) < 40;
+                    }
+                };
+
+                for (int i = 0; i < CNT; i++) {
+                    Object key = key(i);
+                    Object val = value(i);
+
+                    cache.put(key, val);
+
+                    if (filter.apply(key, val)) {
+                        int part = affinity.partition(key);
+
+                        Map<Object, Object> map = partMap.get(part);
+
+                        if (map == null)
+                            partMap.put(part, map = new HashMap<>());
+
+                        map.put(key, val);
+                    }
+                }
+
+                for (int part = 0; part < affinity.partitions(); part++) {
+                    try {
+                        Map<Object, Object> expMap = partMap.get(part);
+
+                        expMap = expMap == null ? Collections.emptyMap() : expMap;
+
+                        registerEventListeners(expMap, true);
+
+                        QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new
ScanQuery<>(part, filter));
+
+                        checkQueryResults(expMap, q);
+                    }
+                    finally {
+                        stopListeners();
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @param expMap Expected map.
+     * @param cursor Query cursor.
+     */
+    private void checkQueryResults(Map<Object, Object> expMap, QueryCursor<Cache.Entry<Object,
Object>> cursor)
+        throws InterruptedException {
+        Iterator<Cache.Entry<Object, Object>> iter = cursor.iterator();
+
+        try {
+            assertNotNull(iter);
+
+            int cnt = 0;
+
+            while (iter.hasNext()) {
+                Cache.Entry<Object, Object> e = iter.next();
+
+                assertNotNull(e.getKey());
+                assertNotNull(e.getValue());
+
+                Object expVal = expMap.get(e.getKey());
+
+                assertNotNull("Failed to resolve expected value for key: " + e.getKey(),
expVal);
+
+                assertEquals(expVal, e.getValue());
+
+                cnt++;
+            }
+
+            assertEquals(expMap.size(), cnt);
+        }
+        finally {
+            cursor.close();
+        }
+
+        checkEvents();
+    }
+
+    /**
+     * Registers event listeners.
+     * @param expMap Expected read events count.
+     */
+    private void registerEventListeners(Map<Object, Object> expMap) {
+        registerEventListeners(expMap, false);
+    }
+
+    /**
+     * Registers event listeners.
+     * @param expMap Expected read events count.
+     * @param filterExp Scan query uses filter.
+     */
+    private void registerEventListeners(Map<Object, Object> expMap, final boolean filterExp)
{
+        this.expMap = expMap != null ? expMap : Collections.emptyMap();
+
+        Set<ClusterNode> affNodes= new HashSet<>();
+
+        if (cacheMode() != REPLICATED) {
+            Affinity<Object> aff = testedGrid().affinity(cacheName());
+
+            for (Object key : this.expMap.keySet())
+                affNodes.add(aff.mapKeyToNode(key));
+        }
+
+        int execEvtCnt = cacheMode() == REPLICATED || (cacheMode() == PARTITIONED &&
affNodes.isEmpty()) ? 1 : affNodes.size();
+
+        evtMap = new ConcurrentHashMap<>();
+        readEvtLatch = new CountDownLatch(this.expMap.size());
+        execEvtLatch = new CountDownLatch(execEvtCnt);
+
+        objReadLsnrs = new IgnitePredicate[gridCount()];
+        qryExecLsnrs = new IgnitePredicate[gridCount()];
+
+        for (int i = 0; i < gridCount(); i++) {
+            IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assertTrue("Event: " + evt, evt instanceof CacheQueryReadEvent);
+
+                    CacheQueryReadEvent<Object, Object> qe = (CacheQueryReadEvent<Object,
Object>)evt;
+
+                    assertEquals(SCAN.name(), qe.queryType());
+                    assertEquals(cacheName(), qe.cacheName());
+
+                    assertNull(qe.className());
+                    assertNull(qe.clause());
+                    assertEquals(filterExp, qe.scanQueryFilter() != null);
+                    assertNull(qe.continuousQueryFilter());
+                    assertNull(qe.arguments());
+
+                    evtMap.put(qe.key(), qe.value());
+
+                    assertFalse(readEvtLatch.getCount() == 0);
+
+                    readEvtLatch.countDown();
+
+                    return true;
+                }
+            };
+
+            grid(i).events().localListen(pred, EVT_CACHE_QUERY_OBJECT_READ);
+
+            objReadLsnrs[i] = pred;
+
+            IgnitePredicate<Event> execPred = new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assertTrue("Event: " + evt, evt instanceof CacheQueryExecutedEvent);
+
+                    CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+                    assertEquals(SCAN.name(), qe.queryType());
+                    assertEquals(cacheName(), qe.cacheName());
+
+                    assertNull(qe.className());
+                    assertNull(qe.clause());
+                    assertEquals(filterExp, qe.scanQueryFilter() != null);
+                    assertNull(qe.continuousQueryFilter());
+                    assertNull(qe.arguments());
+
+                    assertFalse("Too many events.", execEvtLatch.getCount() == 0);
+
+                    execEvtLatch.countDown();
+
+                    return true;
+                }
+            };
+
+            grid(i).events().localListen(execPred, EVT_CACHE_QUERY_EXECUTED);
+
+            qryExecLsnrs[i] = execPred;
+        }
+    }
+
+    /**
+     * Stops listenening.
+     */
+    private void stopListeners() {
+        for (int i = 0; i < gridCount(); i++) {
+            grid(i).events().stopLocalListen(objReadLsnrs[i]);
+            grid(i).events().stopLocalListen(qryExecLsnrs[i]);
+        }
+    }
+
+    /**
+     * @throws InterruptedException If failed.
+     */
+    private void checkEvents() throws InterruptedException {
+        assertTrue(execEvtLatch.await(1000, MILLISECONDS));
+        assertTrue(readEvtLatch.await(1000, MILLISECONDS));
+
+        assertEquals(expMap.size(), evtMap.size());
+
+        for (Map.Entry<Object, Object> e : expMap.entrySet())
+            assertEquals(e.getValue(), evtMap.get(e.getKey()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java
new file mode 100644
index 0000000..83ae27f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheConfigVariationsQueryTest;
+import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder;
+
+/**
+ * Test suite for cache queries.
+ */
+public class IgniteCacheConfigVariationQueryTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        return new ConfigVariationsTestSuiteBuilder(
+            "Cache Config Variations Query Test Suite",
+            IgniteCacheConfigVariationsQueryTest.class)
+            .withBasicCacheParams()
+            .gridsCount(5).backups(1)
+            .testedNodesCount(3).withClients()
+            .build();
+    }
+}


Mime
View raw message