ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [15/50] [abbrv] ignite git commit: IGNITE-8286: ScanQuery ignore setLocal with non local partition. - Fixes #3871.
Date Fri, 07 Sep 2018 13:49:18 GMT
IGNITE-8286: ScanQuery ignore setLocal with non local partition. - Fixes #3871.

Signed-off-by: shroman <rshtykh@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8aa0388
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8aa0388
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8aa0388

Branch: refs/heads/ignite-5960
Commit: e8aa038835c235b76bb22580ca479dc679ba5365
Parents: c7aaee7
Author: shroman <rshtykh@yahoo.com>
Authored: Tue Sep 4 12:32:32 2018 +0900
Committer: shroman <rshtykh@yahoo.com>
Committed: Tue Sep 4 12:32:32 2018 +0900

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxyImpl.java  |  3 +-
 .../cache/query/GridCacheQueryAdapter.java      | 30 ++++++++--
 .../cache/query/GridCacheQueryManager.java      |  8 ++-
 ...CacheScanPartitionQueryFallbackSelfTest.java | 62 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index c21ad0b..69ea562 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -379,7 +379,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
         IgniteBiPredicate<K, V> p = scanQry.getFilter();
 
-        final CacheQuery<R> qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(),
isKeepBinary);
+        final CacheQuery<R> qry = ctx.queries().createScanQuery(
+            p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal());
 
         if (scanQry.getPageSize() > 0)
             qry.pageSize(scanQry.getPageSize());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 07aea4c..0e3ab43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -119,6 +119,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
     /** */
     private volatile boolean incBackups;
 
+    /** Local query. */
+    private boolean forceLocal;
+
     /** */
     private volatile boolean dedup;
 
@@ -143,13 +146,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
      * @param filter Scan filter.
      * @param part Partition.
      * @param keepBinary Keep binary flag.
+     * @param forceLocal Flag to force local query.
      */
     public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
         GridCacheQueryType type,
         @Nullable IgniteBiPredicate<Object, Object> filter,
         @Nullable IgniteClosure<Map.Entry, Object> transform,
         @Nullable Integer part,
-        boolean keepBinary) {
+        boolean keepBinary,
+        boolean forceLocal) {
         assert cctx != null;
         assert type != null;
         assert part == null || part >= 0;
@@ -160,6 +165,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         this.transform = transform;
         this.part = part;
         this.keepBinary = keepBinary;
+        this.forceLocal = forceLocal;
 
         log = cctx.logger(getClass());
 
@@ -313,6 +319,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
     }
 
     /**
+     * @return {@code True} if the query is forced local.
+     */
+    public boolean forceLocal() {
+        return forceLocal;
+    }
+
+    /**
      * @return Security subject ID.
      */
     public UUID subjectId() {
@@ -516,15 +529,20 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
     /** {@inheritDoc} */
     @SuppressWarnings({"IfMayBeConditional", "unchecked"})
     @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException
{
-        assert type == SCAN : "Wrong processing of qyery: " + type;
+        assert type == SCAN : "Wrong processing of query: " + type;
 
         // Affinity nodes snapshot.
         Collection<ClusterNode> nodes = new ArrayList<>(nodes());
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        if (nodes.isEmpty() && part == null)
+        if (nodes.isEmpty()) {
+            if (part != null && forceLocal)
+                throw new IgniteCheckedException("No queryable nodes for partition " + part
+                    + " [forced local query=" + this + "]");
+
             return new GridEmptyCloseableIterator();
+        }
 
         if (log.isDebugEnabled())
             log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
@@ -612,9 +630,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
      * @param cctx Cache context.
      * @param prj Projection (optional).
      * @return Collection of data nodes in provided projection (if any).
+     * @throws IgniteCheckedException If partition number is invalid.
      */
     private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?>
cctx,
-        @Nullable final ClusterGroup prj, @Nullable final Integer part) {
+        @Nullable final ClusterGroup prj, @Nullable final Integer part) throws IgniteCheckedException
{
         assert cctx != null;
 
         final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
@@ -624,6 +643,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         if (prj == null && part == null)
             return affNodes;
 
+        if (part != null && part >= cctx.affinity().partitions())
+            throw new IgniteCheckedException("Invalid partition number: " + part);
+
         final Set<ClusterNode> owners =
             part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part,
topVer));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index c209602..281400e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2723,7 +2723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      */
     public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K,
V> filter,
         @Nullable Integer part, boolean keepBinary) {
-        return createScanQuery(filter, null, part, keepBinary);
+        return createScanQuery(filter, null, part, keepBinary, false);
     }
 
     /**
@@ -2733,18 +2733,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param trans Transformer.
      * @param part Partition.
      * @param keepBinary Keep binary flag.
+     * @param forceLocal Flag to force local scan.
      * @return Created query.
      */
     public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K,
V> filter,
         @Nullable IgniteClosure<T, R> trans,
-        @Nullable Integer part, boolean keepBinary) {
+        @Nullable Integer part, boolean keepBinary, boolean forceLocal) {
 
         return new GridCacheQueryAdapter(cctx,
             SCAN,
             filter,
             trans,
             part,
-            keepBinary);
+            keepBinary,
+            forceLocal);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 999b1ad..3afcad8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -57,6 +58,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -149,6 +151,66 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * Scan (with explicit {@code setLocal(true)}) should perform on the local node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testScanLocalExplicit() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        backups = 0;
+        commSpiFactory = new TestLocalCommunicationSpiFactory();
+
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+            int part = anyLocalPartition(cache.context());
+
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(part).setLocal(true));
+
+            doTestScanQuery(qry, part);
+
+            GridTestUtils.assertThrows(log, (Callable<Void>)() -> {
+                int remPart = remotePartition(cache.context()).getKey();
+
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(remPart).setLocal(true));
+
+                return null;
+            }, IgniteCheckedException.class, null);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Scan (with explicit {@code setLocal(true)}, no partition specified) should perform
on the local node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testScanLocalExplicitNoPart() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        backups = 0;
+        commSpiFactory = new TestLocalCommunicationSpiFactory();
+
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setLocal(true));
+
+            assertFalse(qry.getAll().isEmpty());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Scan should perform on the remote node.
      *
      * @throws Exception If failed.


Mime
View raw message