ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/50] incubator-ignite git commit: ignite-389 Avoid backups filtering in case of partition scan query
Date Thu, 11 Jun 2015 13:12:27 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-745 ed2360877 -> 01bcfd8a7 (forced update)


ignite-389 Avoid backups filtering in case of partition scan query


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d6bb532
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d6bb532
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d6bb532

Branch: refs/heads/ignite-745
Commit: 5d6bb532c7de35cfea7674b5fc1446e72a5fa985
Parents: f00a9e9
Author: agura <agura@gridgain.com>
Authored: Thu May 28 18:30:08 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Thu May 28 18:30:08 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/query/ScanQuery.java    |  12 +-
 .../cache/query/GridCacheQueryAdapter.java      | 122 +++----------------
 .../cache/query/GridCacheQueryManager.java      |   9 +-
 3 files changed, 28 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index f56b0c7..e6b69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -46,6 +46,11 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K,
V>> {
         this(null, null);
     }
 
+    /**
+     * Creates partition scan query returning all entries for given partition.
+     *
+     * @param part Partition.
+     */
     public ScanQuery(int part) {
         this(part, null);
     }
@@ -62,9 +67,10 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K,
V>> {
     /**
      * Create scan query with filter.
      *
+     * @param part Partition.
      * @param filter Filter. If {@code null} then all entries will be returned.
      */
-    public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+    public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter)
{
         setPartition(part);
         setFilter(filter);
     }
@@ -96,7 +102,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K,
V>> {
      *
      * @return Partition number or {@code null}.
      */
-    public Integer getPartition() {
+    @Nullable public Integer getPartition() {
         return part;
     }
 
@@ -106,7 +112,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K,
V>> {
      *
      * @param part Partition number over which this query should iterate.
      */
-    public void setPartition(Integer part) {
+    public void setPartition(@Nullable Integer part) {
         this.part = part;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 6574f0a..2f32faa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -26,14 +26,15 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.security.*;
+
 import org.jetbrains.annotations.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
 
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
         else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture(nodes, bean, qryMgr);
+            return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean,
nodes));
     }
@@ -524,9 +525,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
     /**
      * Wrapper for queries with fallback.
      */
-    private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object,
Object, R> {
-        /** Target. */
-        private GridCacheQueryFutureAdapter<?, ?, R> fut;
+    private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>>
+        implements CacheQueryFuture<R> {
+        /** Query future. */
+        private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
 
         /** Backups. */
         private final Queue<ClusterNode> nodes;
@@ -559,13 +561,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
 
             ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
 
-            if (node != null) {
+            if (node != null)
                 fallbacks.add(node);
 
-                fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE)));
-            }
-            else
-                fallbacks.addAll(nodes);
+            fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
 
             return fallbacks;
         }
@@ -576,10 +575,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         private void init() {
             ClusterNode node = nodes.poll();
 
-            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean)
:
-                qryMgr.queryDistributed(bean, Collections.singleton(node)));
+            GridCacheQueryFutureAdapter<?, ?, R> fut0 =
+                (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean)
:
+                    qryMgr.queryDistributed(bean, Collections.singleton(node)));
 
-            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>()
{
+            fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>()
{
                 @Override public void apply(IgniteInternalFuture<Collection<R>>
fut) {
                     try {
                         onDone(fut.get());
@@ -592,26 +592,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
                     }
                 }
             });
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onPage(UUID nodeId, boolean last) {
-            return fut.onPage(nodeId, last);
-        }
 
-        /** {@inheritDoc} */
-        @Override protected void loadPage() {
-            fut.loadPage();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void loadAllPages() throws IgniteInterruptedCheckedException
{
-            fut.loadAllPages();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void cancelQuery() throws IgniteCheckedException {
-            fut.cancelQuery();
+            fut = fut0;
         }
 
         /** {@inheritDoc} */
@@ -625,84 +607,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         }
 
         /** {@inheritDoc} */
-        @Override void clear() {
-            fut.clear();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long endTime() {
-            return fut.endTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void enqueue(Collection<?> col) {
-            fut.enqueue(col);
-        }
-
-        /** {@inheritDoc} */
-        @Override boolean fields() {
-            return fut.fields();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> get() throws IgniteCheckedException {
-            return fut.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException
{
-            return fut.get(timeout, unit);
-        }
-
-        /** {@inheritDoc} */
         @Override public R next() {
             return fut.next();
         }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> nextPage() throws IgniteCheckedException {
-            return fut.nextPage();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean onDone(Collection<R> res, Throwable err) {
-            return fut.onDone(res, err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException
{
-            return fut.nextPage(timeout);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onNodeLeft(UUID evtNodeId) {
-            fut.onNodeLeft(evtNodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?>
data,
-            @Nullable Throwable err, boolean finished) {
-            fut.onPage(nodeId, data, err, finished);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            fut.onTimeout();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void printMemoryStats() {
-            fut.printMemoryStats();
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheQueryBean query() {
-            return fut.query();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid timeoutId() {
-            return fut.timeoutId();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index fac3d8f..652d62e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -795,7 +795,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             !locPart.reserve())
                             throw new GridDhtInvalidPartitionException(part, "Partition can't
be reserved");
 
-
                         iter = new Iterator<K>() {
                             private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
 
@@ -1329,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                     K key = row.getKey();
 
-                    // Filter backups for SCAN queries. Other types are filtered in indexing
manager.
-                    if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL
&& qry.type() == SCAN &&
-                        !incBackups && !cctx.affinity().primary(cctx.localNode(),
key, topVer)) {
+                    // Filter backups for SCAN queries, if it isn't partition scan.
+                    // Other types are filtered in indexing manager.
+                    if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition()
== null &&
+                        cctx.config().getCacheMode() != LOCAL && !incBackups &&
+                        !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                         if (log.isDebugEnabled())
                             log.debug("Ignoring backup element [row=" + row +
                                 ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups="
+ incBackups +


Mime
View raw message