Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 677182009F8 for ; Thu, 19 May 2016 11:37:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 663FE160A00; Thu, 19 May 2016 09:37:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62C84160A45 for ; Thu, 19 May 2016 11:37:36 +0200 (CEST) Received: (qmail 40308 invoked by uid 500); 19 May 2016 09:37:35 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38955 invoked by uid 99); 19 May 2016 09:37:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2016 09:37:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A7463E95C2; Thu, 19 May 2016 09:37:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 19 May 2016 09:38:05 -0000 Message-Id: <07871034cb1744f7adb6d7fd72c5e27b@git.apache.org> In-Reply-To: <5f92691f09ae4a6b98f1a9e14dc4ff17@git.apache.org> References: <5f92691f09ae4a6b98f1a9e14dc4ff17@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/43] ignite git commit: ignite-2921: ScanQueries over local partitions performance optimisation archived-at: Thu, 19 May 2016 09:37:40 -0000 ignite-2921: ScanQueries over local partitions performance optimisation Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb3e104 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb3e104 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb3e104 Branch: refs/heads/ignite-3163 Commit: bcb3e104bfc8c8fbc882a475feecf9efef4e17d8 Parents: 40fc2ec Author: ashutak Authored: Mon May 16 16:27:20 2016 +0300 Committer: ashutak Committed: Mon May 16 16:27:20 2016 +0300 ---------------------------------------------------------------------- .../cache/CacheWeakQueryIteratorsHolder.java | 169 ++++++- .../processors/cache/GridCacheAdapter.java | 10 +- .../processors/cache/GridCacheSwapManager.java | 12 +- .../processors/cache/IgniteCacheProxy.java | 36 +- .../binary/CacheObjectBinaryProcessorImpl.java | 16 +- .../processors/cache/query/CacheQuery.java | 10 +- .../cache/query/CacheQueryFuture.java | 13 +- .../query/GridCacheDistributedQueryManager.java | 100 +++- .../cache/query/GridCacheLocalQueryManager.java | 34 +- .../cache/query/GridCacheQueryAdapter.java | 175 +++++-- .../cache/query/GridCacheQueryErrorFuture.java | 12 +- .../query/GridCacheQueryFutureAdapter.java | 10 +- .../cache/query/GridCacheQueryManager.java | 502 ++++++++++++++----- .../datastructures/GridCacheSetImpl.java | 4 +- .../service/GridServiceProcessor.java | 45 +- ...achePartitionedPreloadLifecycleSelfTest.java | 102 +--- ...CacheReplicatedPreloadLifecycleSelfTest.java | 132 +---- .../CacheAbstractQueryMetricsSelfTest.java | 4 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 34 +- 19 files changed, 860 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java index 4c48e74..2e03b53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.internal.U; import org.jsr166.ConcurrentHashMap8; @@ -34,11 +35,10 @@ import org.jsr166.ConcurrentHashMap8; */ public class CacheWeakQueryIteratorsHolder { /** Iterators weak references queue. */ - private final ReferenceQueue refQueue = new ReferenceQueue<>(); + private final ReferenceQueue refQueue = new ReferenceQueue(); /** Iterators futures. */ - private final Map,CacheQueryFuture> futs = - new ConcurrentHashMap8<>(); + private final Map refs = new ConcurrentHashMap8<>(); /** Logger. */ private final IgniteLogger log; @@ -56,10 +56,27 @@ public class CacheWeakQueryIteratorsHolder { * @param Type for the iterator. * @return Iterator over the cache. */ - public WeakQueryFutureIterator iterator(CacheQueryFuture fut, CacheIteratorConverter convert) { + public WeakReferenceCloseableIterator iterator(final CacheQueryFuture fut, + CacheIteratorConverter convert) { WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); - CacheQueryFuture old = futs.put(it.weakReference(), fut); + AutoCloseable old = refs.put(it.weakReference(), fut); + + assert old == null; + + return it; + } + + /** + * @param iter Closeable iterator. + * @param Type for the iterator. + * @return Iterator over the cache. + */ + public WeakReferenceCloseableIterator iterator(final GridCloseableIterator iter, + CacheIteratorConverter convert) { + WeakQueryCloseableIterator it = new WeakQueryCloseableIterator(iter, convert); + + AutoCloseable old = refs.put(it.weakReference(), iter); assert old == null; @@ -71,8 +88,8 @@ public class CacheWeakQueryIteratorsHolder { * * @throws IgniteCheckedException If failed. */ - public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedException { - futs.remove(it.weakReference()); + public void removeIterator(WeakReferenceCloseableIterator it) throws IgniteCheckedException { + refs.remove(it.weakReference()); it.close(); } @@ -81,17 +98,17 @@ public class CacheWeakQueryIteratorsHolder { * Closes unreachable iterators. */ public void checkWeakQueue() { - for (Reference itRef = refQueue.poll(); itRef != null; + for (Reference itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { try { - WeakReference weakRef = (WeakReference)itRef; + WeakReference weakRef = (WeakReference)itRef; - CacheQueryFuture fut = futs.remove(weakRef); + AutoCloseable rsrc = refs.remove(weakRef); - if (fut != null) - fut.cancel(); + if (rsrc != null) + rsrc.close(); } - catch (IgniteCheckedException e) { + catch (Exception e) { U.error(log, "Failed to close iterator.", e); } } @@ -101,16 +118,16 @@ public class CacheWeakQueryIteratorsHolder { * Cancel all cache queries. */ public void clearQueries(){ - for (CacheQueryFuture fut : futs.values()) { + for (AutoCloseable rsrc : refs.values()) { try { - fut.cancel(); + rsrc.close(); } - catch (IgniteCheckedException e) { + catch (Exception e) { U.error(log, "Failed to close iterator.", e); } } - futs.clear(); + refs.clear(); } @@ -119,7 +136,8 @@ public class CacheWeakQueryIteratorsHolder { * * @param Type for iterator. */ - public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { + private class WeakQueryFutureIterator extends GridCloseableIteratorAdapter + implements WeakReferenceCloseableIterator { /** */ private static final long serialVersionUID = 0L; @@ -204,10 +222,8 @@ public class CacheWeakQueryIteratorsHolder { cur = null; } - /** - * @return Iterator weak reference. - */ - private WeakReference> weakReference() { + /** {@inheritDoc} */ + @Override public WeakReference> weakReference() { return weakRef; } @@ -217,7 +233,7 @@ public class CacheWeakQueryIteratorsHolder { private void clearWeakReference() { weakRef.clear(); - futs.remove(weakRef); + refs.remove(weakRef); } /** @@ -233,4 +249,109 @@ public class CacheWeakQueryIteratorsHolder { } } } -} \ No newline at end of file + + /** + * @param Type. + */ + public class WeakQueryCloseableIterator extends GridCloseableIteratorAdapter + implements WeakReferenceCloseableIterator { + /** */ + private static final long serialVersionUID = 0; + + /** */ + private final GridCloseableIterator iter; + + /** */ + private final CacheIteratorConverter convert; + + /** */ + private final WeakReference weakRef; + + /** */ + private T cur; + + /** + * @param iter Iterator. + * @param convert Converter. + */ + WeakQueryCloseableIterator(GridCloseableIterator iter, CacheIteratorConverter convert) { + this.iter = iter; + this.convert = convert; + + weakRef = new WeakReference(this, refQueue); + } + + + /** {@inheritDoc} */ + @Override protected T onNext() throws IgniteCheckedException { + V next; + + try { + next = iter.nextX(); + } + catch (NoSuchElementException e){ + clearWeakReference(); + + throw e; + } + + if (next == null) + clearWeakReference(); + + cur = next != null ? convert.convert(next) : null; + + return cur; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + boolean hasNextX = iter.hasNextX(); + + if (!hasNextX) + clearWeakReference(); + + return hasNextX; + } + + /** {@inheritDoc} */ + @Override protected void onRemove() throws IgniteCheckedException { + if (cur == null) + throw new IllegalStateException(); + + convert.remove(cur); + + cur = null; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + iter.close(); + + clearWeakReference(); + } + + /** + * Clears weak reference. + */ + private void clearWeakReference() { + weakRef.clear(); + + refs.remove(weakRef); + } + + /** {@inheritDoc} */ + @Override public WeakReference weakReference() { + return weakRef; + } + } + + /** + * + */ + public static interface WeakReferenceCloseableIterator extends GridCloseableIterator { + /** + * @return Iterator weak reference. + */ + public WeakReference weakReference(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fbba82e..dd06ef8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; @@ -107,6 +106,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridTriple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -4088,7 +4088,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> igniteIterator() { + public Iterator> igniteIterator() throws IgniteCheckedException { GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -4096,11 +4096,11 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> fut = ctx0.queries().createScanQuery(null, null, ctx.keepBinary()) + final GridCloseableIterator> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary()) .keepAll(false) - .execute(); + .executeScanQuery(); - return ctx.itHolder().iterator(fut, new CacheIteratorConverter, Map.Entry>() { + return ctx.itHolder().iterator(iter, new CacheIteratorConverter, Map.Entry>() { @Override protected Cache.Entry convert(Map.Entry e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index d50bf0b..127f1be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1848,7 +1848,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Off-heap iterator. */ public GridCloseableIterator rawOffHeapIterator(final CX2, T2, T> c, - Integer part, + @Nullable Integer part, boolean primary, boolean backup) { @@ -1859,8 +1859,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); - if (primary && backup) - return offheap.iterator(spaceName, c); + if (primary && backup) { + if (part == null) + return offheap.iterator(spaceName, c); + else + return offheap.iterator(spaceName, c, part); + } AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); @@ -1894,7 +1898,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled || (!primary && !backup)) return new GridEmptyCloseableIterator<>(); - if (primary && backup) + if (primary && backup && part == null) return new GridCloseableIteratorAdapter>() { private GridCloseableIterator> it = offheap.iterator(spaceName); http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9b7ac4c..12ec8b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -455,7 +456,6 @@ public class IgniteCacheProxy extends AsyncSupportAdapter> query(final Query filter, @Nullable ClusterGroup grp) throws IgniteCheckedException { final CacheQuery> qry; - final CacheQueryFuture> fut; boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary(); @@ -467,14 +467,35 @@ public class IgniteCacheProxy extends AsyncSupportAdapter>>() { - @Override public CacheQueryFuture> applyx() throws IgniteCheckedException { - return qry.execute(); + final GridCloseableIterator> iter = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX>>() { + @Override public GridCloseableIterator> applyx() throws IgniteCheckedException { + final GridCloseableIterator iter0 = qry.executeScanQuery(); + + return new GridCloseableIteratorAdapter>() { + @Override protected Cache.Entry onNext() throws IgniteCheckedException { + Map.Entry next = iter0.nextX(); + + return new CacheEntryImpl<>(next.getKey(), next.getValue()); + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter0.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter0.close(); + } + }; } }, false); + + return new QueryCursorImpl<>(iter); } - else if (filter instanceof TextQuery) { + + final CacheQueryFuture> fut; + + if (filter instanceof TextQuery) { TextQuery p = (TextQuery)filter; qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary); @@ -1797,6 +1818,9 @@ public class IgniteCacheProxy extends AsyncSupportAdapter> fut = qry.execute(); - - Map.Entry next; - - while ((next = fut.next()) != null) { - assert next.getKey() != null : next; - assert next.getValue() != null : next; + try (GridCloseableIterator> entries = qry.executeScanQuery()) { + for (Map.Entry e : entries) { + assert e.getKey() != null : e; + assert e.getValue() != null : e; - addClientCacheMetaData(next.getKey(), next.getValue()); + addClientCacheMetaData(e.getKey(), e.getValue()); } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 5f9dc61..47c6e89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.query; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.AffinityKey; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryMetrics; @@ -24,6 +26,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cache.query.annotations.QueryTextField; import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; @@ -289,4 +292,9 @@ public interface CacheQuery { * Resets metrics for this query. */ public void resetMetrics(); -} \ No newline at end of file + + /** + * @return Scan query iterator. + */ + public GridCloseableIterator executeScanQuery() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java index bb342b3..a0244d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java @@ -26,16 +26,7 @@ import org.jetbrains.annotations.Nullable; * Cache query future returned by query execution. * Refer to {@link CacheQuery} documentation for more information. */ -public interface CacheQueryFuture extends IgniteInternalFuture> { - /** - * Returns number of elements that are already fetched and can - * be returned from {@link #next()} method without blocking. - * - * @return Number of fetched elements which are available immediately. - * @throws IgniteCheckedException In case of error. - */ - public int available() throws IgniteCheckedException; - +public interface CacheQueryFuture extends IgniteInternalFuture>, AutoCloseable { /** * Returns next element from result set. *

@@ -62,4 +53,4 @@ public interface CacheQueryFuture extends IgniteInternalFuture> * @throws IgniteCheckedException {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 353fbd3..5f6cb8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; @@ -35,6 +36,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -55,7 +58,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; /** - * Distributed query manager. + * Distributed query manager (for cache in REPLICATED / PARTITIONED cache mode). */ public class GridCacheDistributedQueryManager extends GridCacheQueryManager { /** */ @@ -512,29 +515,8 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage } /** {@inheritDoc} */ - @Override public CacheQueryFuture queryLocal(GridCacheQueryBean qry) { - assert cctx.config().getCacheMode() != LOCAL; - - if (log.isDebugEnabled()) - log.debug("Executing query on local node: " + qry); - - GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry); - - try { - qry.query().validate(); - - fut.execute(); - } - catch (IgniteCheckedException e) { - fut.onDone(e); - } - - return fut; - } - - /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public CacheQueryFuture queryDistributed(GridCacheQueryBean qry, Collection nodes) { + @Override public CacheQueryFuture queryDistributed(GridCacheQueryBean qry, final Collection nodes) { assert cctx.config().getCacheMode() != LOCAL; if (log.isDebugEnabled()) @@ -550,7 +532,7 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage String clsName = qry.query().queryClassName(); - GridCacheQueryRequest req = new GridCacheQueryRequest( + final GridCacheQueryRequest req = new GridCacheQueryRequest( cctx.cacheId(), reqId, cctx.name(), @@ -595,6 +577,76 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage } /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "serial"}) + @Override public GridCloseableIterator> scanQueryDistributed(final GridCacheQueryAdapter qry, + Collection nodes) throws IgniteCheckedException { + assert !cctx.isLocal() : cctx.name(); + assert qry.type() == GridCacheQueryType.SCAN: qry; + + GridCloseableIterator> locIter0 = null; + + for (ClusterNode node : nodes) { + if (node.isLocal()) { + locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false); + + Collection rmtNodes = new ArrayList<>(nodes.size() - 1); + + for (ClusterNode n : nodes) { + // Equals by reference can be used here. + if (n != node) + rmtNodes.add(n); + } + + nodes = rmtNodes; + + break; + } + } + + final GridCloseableIterator> locIter = locIter0; + + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); + + final CacheQueryFuture> fut = (CacheQueryFuture>)queryDistributed(bean, nodes); + + return new GridCloseableIteratorAdapter>() { + /** */ + private Map.Entry cur; + + @Override protected Map.Entry onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Map.Entry e = cur; + + cur = null; + + return e; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (cur != null) + return true; + + if (locIter != null && locIter.hasNextX()) + cur = locIter.nextX(); + + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + + if (locIter != null) + locIter.close(); + + if (fut != null) + fut.cancel(); + } + }; + } + + /** {@inheritDoc} */ @Override public void loadPage(long id, GridCacheQueryAdapter qry, Collection nodes, boolean all) { assert cctx.config().getCacheMode() != LOCAL; assert qry != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java index 4e72f97..183abde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java @@ -19,16 +19,18 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; /** - * Local query manager. + * Local query manager (for cache in LOCAL cache mode). */ public class GridCacheLocalQueryManager extends GridCacheQueryManager { /** {@inheritDoc} */ @@ -80,32 +82,20 @@ public class GridCacheLocalQueryManager extends GridCacheQueryManager queryLocal(GridCacheQueryBean qry) { + @Override public CacheQueryFuture queryDistributed(GridCacheQueryBean qry, Collection nodes) { assert cctx.config().getCacheMode() == LOCAL; - if (log.isDebugEnabled()) - log.debug("Executing query on local node: " + qry); - - GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry); - - try { - qry.query().validate(); - - fut.execute(); - } - catch (IgniteCheckedException e) { - fut.onDone(e); - } - - return fut; + throw new IgniteException("Distributed queries are not available for local cache " + + "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); } /** {@inheritDoc} */ - @Override public CacheQueryFuture queryDistributed(GridCacheQueryBean qry, Collection nodes) { - assert cctx.config().getCacheMode() == LOCAL; + @Override public GridCloseableIterator> scanQueryDistributed(GridCacheQueryAdapter qry, + Collection nodes) throws IgniteCheckedException { + assert cctx.isLocal() : cctx.name(); - throw new IgniteException("Distributed queries are not available for local cache " + - "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); + throw new IgniteException("Distributed scan query are not available for local cache " + + "(use 'CacheQuery.executeScanQuery(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); } /** {@inheritDoc} */ @@ -142,4 +132,4 @@ public class GridCacheLocalQueryManager extends GridCacheQueryManager CacheQueryFuture execute(@Nullable IgniteReducer rmtReducer, @Nullable IgniteClosure rmtTransform, @Nullable Object... args) { + assert type != SCAN : this; + Collection nodes; try { @@ -440,7 +447,7 @@ public class GridCacheQueryAdapter implements CacheQuery { cctx.checkSecurity(SecurityPermission.CACHE_READ); - if (nodes.isEmpty() && (type != SCAN || part == null)) + if (nodes.isEmpty()) return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException()); if (log.isDebugEnabled()) @@ -471,12 +478,44 @@ public class GridCacheQueryAdapter implements CacheQuery { if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); - else if (type == SCAN && part != null && !cctx.isLocal()) - return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx); else return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { + assert type == SCAN: "Wrong processing of qyery: " + type; + + Collection nodes = nodes(); + + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + if (nodes.isEmpty() && part == null) + return new GridEmptyCloseableIterator(); + + if (log.isDebugEnabled()) + log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); + + if (cctx.deploymentEnabled()) + cctx.deploy().registerClasses(filter); + + if (subjId == null) + subjId = cctx.localNodeId(); + + taskHash = cctx.kernalContext().job().currentTaskNameHash(); + + final GridCacheQueryManager qryMgr = cctx.queries(); + + if (part != null && !cctx.isLocal()) + return (GridCloseableIterator)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); + else { + boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); + + return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes); + } + } + /** * @return Nodes to execute on. */ @@ -549,10 +588,12 @@ public class GridCacheQueryAdapter implements CacheQuery { /** * Wrapper for queries with fallback. */ - private static class CacheQueryFallbackFuture extends GridFutureAdapter> - implements CacheQueryFuture { + private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter { + /** */ + private static final long serialVersionUID = 0L; + /** Query future. */ - private volatile GridCacheQueryFutureAdapter fut; + private volatile T2, GridCacheQueryFutureAdapter> tuple; /** Backups. */ private volatile Queue nodes; @@ -564,7 +605,7 @@ public class GridCacheQueryAdapter implements CacheQuery { private volatile int unreservedNodesRetryCnt = 5; /** Bean. */ - private final GridCacheQueryBean bean; + private final GridCacheQueryAdapter qry; /** Query manager. */ private final GridCacheQueryManager qryMgr; @@ -578,15 +619,18 @@ public class GridCacheQueryAdapter implements CacheQuery { /** Flag indicating that a first item has been returned to a user. */ private boolean firstItemReturned; + /** */ + private Map.Entry cur; + /** * @param part Partition. - * @param bean Bean. + * @param qry Query. * @param qryMgr Query manager. * @param cctx Cache context. */ - private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean, + private ScanQueryFallbackClosableIterator(int part, GridCacheQueryAdapter qry, GridCacheQueryManager qryMgr, GridCacheContext cctx) { - this.bean = bean; + this.qry = qry; this.qryMgr = qryMgr; this.cctx = cctx; this.part = part; @@ -628,46 +672,82 @@ public class GridCacheQueryAdapter implements CacheQuery { private void init() { final ClusterNode node = nodes.poll(); - fut = (GridCacheQueryFutureAdapter)(node.isLocal() ? - qryMgr.queryLocal(bean) : - qryMgr.queryDistributed(bean, Collections.singleton(node))); - } + if (node.isLocal()) { + try { + GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true); - /** {@inheritDoc} */ - @Override public int available() { - return fut.available(); - } + tuple= new T2(it, null); + } + catch (IgniteClientDisconnectedCheckedException e) { + throw CU.convertToCacheException(e); + } + catch (IgniteCheckedException e) { + retryIfPossible(e); + } + } + else { + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - return fut.cancel(); + GridCacheQueryFutureAdapter fut = + (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node)); + + tuple = new T2(null, fut); + } } /** {@inheritDoc} */ - @Override public Collection get() throws IgniteCheckedException { - assert false; + @Override protected Map.Entry onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); - return super.get(); + assert cur != null; + + Map.Entry e = cur; + + cur = null; + + return e; } /** {@inheritDoc} */ - @Override public R next() { - if (firstItemReturned) - return fut.next(); - + @Override protected boolean onHasNext() throws IgniteCheckedException { while (true) { - try { - fut.awaitFirstPage(); + if (cur != null) + return true; - firstItemReturned = true; + T2, GridCacheQueryFutureAdapter> t = tuple; - return fut.next(); - } - catch (IgniteClientDisconnectedCheckedException e) { - throw CU.convertToCacheException(e); + GridCloseableIterator iter = t.get1(); + + if (iter != null) { + boolean hasNext = iter.hasNext(); + + if (hasNext) + cur = iter.next(); + + return hasNext; } - catch (IgniteCheckedException e) { - retryIfPossible(e); + else { + GridCacheQueryFutureAdapter fut = t.get2(); + + assert fut != null; + + if (firstItemReturned) + return (cur = (Map.Entry)fut.next()) != null; + + try { + fut.awaitFirstPage(); + + firstItemReturned = true; + + return (cur = (Map.Entry)fut.next()) != null; + } + catch (IgniteClientDisconnectedCheckedException e) { + throw CU.convertToCacheException(e); + } + catch (IgniteCheckedException e) { + retryIfPossible(e); + } } } } @@ -679,8 +759,10 @@ public class GridCacheQueryAdapter implements CacheQuery { try { IgniteInternalFuture retryFut; - if (e.hasCause(GridDhtUnreservedPartitionException.class)) { - AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion(); + GridDhtUnreservedPartitionException partErr = X.cause(e, GridDhtUnreservedPartitionException.class); + + if (partErr != null) { + AffinityTopologyVersion waitVer = partErr.topologyVersion(); assert waitVer != null; @@ -720,5 +802,18 @@ public class GridCacheQueryAdapter implements CacheQuery { throw CU.convertToCacheException(ex); } } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + + T2, GridCacheQueryFutureAdapter> t = tuple; + + if (t != null && t.get1() != null) + t.get1().close(); + + if (t != null && t.get2() != null) + t.get2().cancel(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java index fd8c4d8..ac14ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java @@ -36,14 +36,14 @@ public class GridCacheQueryErrorFuture extends GridFinishedFuture extends GridFutureAda } /** {@inheritDoc} */ - @Override public int available() { - return cnt.get(); - } - - /** {@inheritDoc} */ @Override public R next() { try { R next = unmaskNull(internalIterator().next()); @@ -571,6 +566,11 @@ public abstract class GridCacheQueryFutureAdapter extends GridFutureAda } /** {@inheritDoc} */ + @Override public void close() throws Exception { + cancel(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheQueryFutureAdapter.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index df95e2e..3b3c5f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -38,6 +38,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -465,7 +466,27 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @param qry Query. * @return Query future. */ - public abstract CacheQueryFuture queryLocal(GridCacheQueryBean qry); + @SuppressWarnings("unchecked") + public CacheQueryFuture queryLocal(GridCacheQueryBean qry) { + assert qry.query().type() != GridCacheQueryType.SCAN : qry; + + if (log.isDebugEnabled()) + log.debug("Executing query on local node: " + qry); + + GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry); + + try { + qry.query().validate(); + + fut.execute(); + } + catch (IgniteCheckedException e) { + if (fut != null) + fut.onDone(e); + } + + return fut; + } /** * Executes distributed query. @@ -477,6 +498,17 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte public abstract CacheQueryFuture queryDistributed(GridCacheQueryBean qry, Collection nodes); /** + * Executes distributed SCAN query. + * + * @param qry Query. + * @param nodes Nodes. + * @return Iterator. + * @throws IgniteCheckedException If failed. + */ + public abstract GridCloseableIterator> scanQueryDistributed(GridCacheQueryAdapter qry, + Collection nodes) throws IgniteCheckedException; + + /** * Loads page. * * @param id Query ID. @@ -590,7 +622,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte taskName)); } - iter = scanIterator(qry); + iter = scanIterator(qry, false); break; @@ -799,18 +831,13 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** * @param qry Query. + * @param locNode Local node. * @return Full-scan row iterator. * @throws IgniteCheckedException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator> scanIterator(final GridCacheQueryAdapter qry) + private GridCloseableIterator> scanIterator(final GridCacheQueryAdapter qry, boolean locNode) throws IgniteCheckedException { - IgniteInternalCache prj0 = cctx.cache(); - - prj0 = prj0.keepBinary(); - - final IgniteInternalCache prj = prj0; - final IgniteBiPredicate keyValFilter = qry.scanFilter(); try { @@ -822,56 +849,12 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - Iterator keyIter; - - GridDhtLocalPartition locPart = null; - - Integer part = qry.partition(); - - if (part == null || cctx.isLocal()) - keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); - else if (part < 0 || part >= cctx.affinity().partitions()) - keyIter = F.emptyIterator(); - else { - final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); - - locPart = dht.topology().localPartition(part, topVer, false); - - // double check for owning state - if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) - throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), - "Partition can not be reserved"); - - final GridDhtLocalPartition locPart0 = locPart; - - keyIter = new Iterator() { - private Iterator iter0 = locPart0.keySet().iterator(); - - @Override public boolean hasNext() { - return iter0.hasNext(); - } - - @Override public K next() { - return (K)iter0.next(); - } - - @Override public void remove() { - iter0.remove(); - } - }; - } - - final GridDhtLocalPartition locPart0 = locPart; - - final GridCloseableIteratorAdapter> heapIt = - new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), true) { - @Override protected void onClose() { - super.onClose(); - - if (locPart0 != null) - locPart0.release(); - } - }; + final GridIterator> heapIt = onheapIterator(qry, + topVer, + keyValFilter, + backups, + plc, + locNode); final GridIterator> it; @@ -881,10 +864,10 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte iters.add(heapIt); if (cctx.isOffHeapEnabled()) - iters.add(offheapIterator(qry, topVer, backups, plc)); + iters.add(offheapIterator(qry, topVer, backups, plc, locNode)); if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry, topVer, backups, plc)); + iters.add(swapIterator(qry, topVer, backups, plc, locNode)); it = new CompoundIterator<>(iters); } @@ -906,7 +889,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte @Override protected void onClose() throws IgniteCheckedException { try { - heapIt.close(); + if (heapIt instanceof IgniteSpiCloseableIterator) + ((IgniteSpiCloseableIterator)heapIt).close(); } finally { closeScanFilter(keyValFilter); @@ -914,8 +898,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } }; } - catch (IgniteCheckedException | RuntimeException e) - { + catch (IgniteCheckedException | RuntimeException e) { closeScanFilter(keyValFilter); throw e; @@ -934,7 +917,10 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** * @param qry Query. + * @param topVer Topology version. * @param backups Include backups. + * @param expPlc Expiry policy. + * @param locNode Local node. * @return Swap iterator. * @throws IgniteCheckedException If failed. */ @@ -942,8 +928,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte GridCacheQueryAdapter qry, AffinityTopologyVersion topVer, boolean backups, - ExpiryPolicy expPlc - ) throws IgniteCheckedException { + ExpiryPolicy expPlc, + boolean locNode) throws IgniteCheckedException { IgniteBiPredicate filter = qry.scanFilter(); Integer part = qry.partition(); @@ -957,22 +943,146 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte topVer, filter, expPlc, - qry.keepBinary()); + qry.keepBinary(), locNode); - return scanIterator(it, filter, qry.keepBinary()); + return scanIterator(it, filter, qry.keepBinary(), locNode); } /** * @param qry Query. + * @param topVer Topology version. + * @param keyValFilter Filter. * @param backups Include backups. + * @param plc Expiry policy. + * @param locNode Local node. + * @return Offheap iterator. + * @throws GridDhtUnreservedPartitionException If failed to reserve partition. + */ + private GridIterator> onheapIterator( + GridCacheQueryAdapter qry, + AffinityTopologyVersion topVer, + final IgniteBiPredicate keyValFilter, + boolean backups, + final ExpiryPolicy plc, + final boolean locNode) throws GridDhtUnreservedPartitionException { + Iterator keyIter; + + GridDhtLocalPartition locPart = null; + + Integer part = qry.partition(); + + if (part == null || cctx.isLocal()) { + // Performance optimization. + if (locNode && plc == null && !cctx.isLocal()) { + GridDhtCacheAdapter cache = cctx.isNear() ? cctx.near().dht() : cctx.dht(); + + final Iterator> iter = cache.localEntriesIterator(true, backups); + + return new GridIteratorAdapter>() { + /** */ + private IgniteBiTuple next; + + { + advance(); + } + + @Override public boolean hasNextX() throws IgniteCheckedException { + return next != null; + } + + @Override public IgniteBiTuple nextX() throws IgniteCheckedException { + if (next == null) + throw new NoSuchElementException(); + + IgniteBiTuple next0 = next; + + advance(); + + return next0; + } + + @Override public void removeX() throws IgniteCheckedException { + // No-op. + } + + private void advance() { + IgniteBiTuple next0 = null; + + while (iter.hasNext()) { + Cache.Entry cacheEntry = iter.next(); + + if (keyValFilter != null && !keyValFilter.apply(cacheEntry.getKey(), cacheEntry.getValue())) + continue; + + next0 = new IgniteBiTuple<>(cacheEntry.getKey(), cacheEntry.getValue()); + + break; + } + + next = next0; + } + }; + } + + IgniteInternalCache keepBinaryCache = cctx.cache().keepBinary(); + + keyIter = backups ? keepBinaryCache.keySetx().iterator() : keepBinaryCache.primaryKeySet().iterator(); + } + else if (part < 0 || part >= cctx.affinity().partitions()) + keyIter = F.emptyIterator(); + else { + final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); + + locPart = dht.topology().localPartition(part, topVer, false); + + // Double check for owning state. + if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) + throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), + "Partition can not be reserved."); + + final GridDhtLocalPartition locPart0 = locPart; + + keyIter = new Iterator() { + private Iterator iter0 = locPart0.keySet().iterator(); + + @Override public boolean hasNext() { + return iter0.hasNext(); + } + + @Override public K next() { + return (K)iter0.next(); + } + + @Override public void remove() { + iter0.remove(); + } + }; + } + + final GridDhtLocalPartition locPart0 = locPart; + + return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) { + @Override protected void onClose() { + super.onClose(); + + if (locPart0 != null) + locPart0.release(); + } + }; + } + + /** + * @param qry Query. + * @param backups Include backups. + * @param locNode Local node. * @return Offheap iterator. */ private GridIterator> offheapIterator( GridCacheQueryAdapter qry, AffinityTopologyVersion topVer, boolean backups, - ExpiryPolicy expPlc - ) { + ExpiryPolicy expPlc, + boolean locNode) { IgniteBiPredicate filter = qry.scanFilter(); if (expPlc != null) { @@ -981,18 +1091,18 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte topVer, filter, expPlc, - qry.keepBinary()); + qry.keepBinary(), locNode); } if (cctx.offheapTiered() && filter != null) { - OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary()); + OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary(), locNode); return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { Iterator> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); - return scanIterator(it, filter, qry.keepBinary()); + return scanIterator(it, filter, qry.keepBinary(), locNode); } } @@ -1000,12 +1110,14 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @param it Lazy swap or offheap iterator. * @param filter Scan filter. * @param keepBinary Keep binary flag. + * @param locNode Local node. * @return Iterator. */ private GridIteratorAdapter> scanIterator( @Nullable final Iterator> it, @Nullable final IgniteBiPredicate filter, - final boolean keepBinary) { + final boolean keepBinary, + final boolean locNode) { if (it == null) return new GridEmptyCloseableIterator<>(); @@ -1041,15 +1153,18 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte while (it.hasNext()) { final LazySwapEntry e = new LazySwapEntry(it.next()); - if (filter != null) { - K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary); - V val = (V)cctx.unwrapBinaryIfNeeded(e.value(), keepBinary); + K key = e.key(); + V val = e.value(); - if (!filter.apply(key, val)) - continue; + if (filter != null || locNode) { + key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); + val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary); } - next = new IgniteBiTuple<>(e.key(), e.value()); + if (filter != null && !filter.apply(key, val)) + continue; + + next = new IgniteBiTuple<>(key, val); break; } @@ -1063,6 +1178,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @param filter Filter. * @param expPlc Expiry policy. * @param keepBinary Keep binary flag. + * @param locNode Local node. * @return Final key-value iterator. */ private GridIterator> scanExpiryIterator( @@ -1070,8 +1186,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte AffinityTopologyVersion topVer, @Nullable final IgniteBiPredicate filter, ExpiryPolicy expPlc, - final boolean keepBinary - ) { + final boolean keepBinary, + boolean locNode) { Iterator keyIter = new Iterator() { /** {@inheritDoc} */ @Override public boolean hasNext() { @@ -1096,7 +1212,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } }; - return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, false); + return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, locNode, false); } /** @@ -1317,6 +1433,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte @SuppressWarnings("unchecked") protected void runQuery(GridCacheQueryInfo qryInfo) { assert qryInfo != null; + assert qryInfo.query().type() != SCAN || !qryInfo.local() : qryInfo; if (!enterBusy()) { if (cctx.localNodeId().equals(qryInfo.senderId())) @@ -1438,6 +1555,9 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } if (readEvt) { + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); + V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + switch (type) { case SQL: cctx.gridEvents().record(new CacheQueryReadEvent<>( @@ -1453,8 +1573,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte qryInfo.arguments(), qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1474,8 +1594,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte null, qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1495,8 +1615,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte null, qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1588,12 +1708,127 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** + * Process local scan query. + * + * @param qry Query. + * @param updStatisticsIfNeeded Update statistics flag. + */ + @SuppressWarnings({"unchecked", "serial"}) + protected GridCloseableIterator> scanQueryLocal(final GridCacheQueryAdapter qry, + final boolean updStatisticsIfNeeded) throws IgniteCheckedException { + if (!enterBusy()) + throw new IllegalStateException("Failed to process query request (grid is stopping)."); + + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled; + + long startTime = U.currentTimeMillis(); + + try { + assert qry.type() == SCAN; + + if (log.isDebugEnabled()) + log.debug("Running local SCAN query: " + qry); + + final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); + final IgniteBiPredicate filter = qry.scanFilter(); + final String namex = cctx.namex(); + final ClusterNode locNode = cctx.localNode(); + final UUID subjId = qry.subjectId(); + + if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + cctx.gridEvents().record(new CacheQueryExecutedEvent<>( + locNode, + "Scan query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SCAN.name(), + namex, + null, + null, + filter, + null, + null, + subjId, + taskName)); + } + + final GridCloseableIterator> iter = scanIterator(qry, true); + + if (updStatisticsIfNeeded) { + needUpdStatistics = false; + + cctx.queries().onCompleted(U.currentTimeMillis() - startTime, false); + } + + final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + return new GridCloseableIteratorAdapter>() { + @Override protected IgniteBiTuple onNext() throws IgniteCheckedException { + long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteBiTuple next = iter.nextX(); + + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + if (readEvt) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Scan query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SCAN.name(), + namex, + null, + null, + filter, + null, + null, + subjId, + taskName, + next.getKey(), + next.getValue(), + null, + null)); + } + + return next; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter.close(); + } + }; + } + catch (Exception e) { + if (needUpdStatistics) + cctx.queries().onCompleted(U.currentTimeMillis() - startTime, true); + + throw e; + } + finally { + leaveBusy(); + } + } + + /** * @param qryInfo Info. * @param taskName Task name. * @return Iterator. * @throws IgniteCheckedException In case of error. */ - private QueryResult queryResult(GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { + private QueryResult queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { + assert qryInfo != null; + final UUID sndId = qryInfo.senderId(); assert sndId != null; @@ -1601,8 +1836,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte Map>> futs = qryIters.get(sndId); if (futs == null) { - futs = new LinkedHashMap>>( - 16, 0.75f, true) { + futs = new LinkedHashMap>>(16, 0.75f, true) { @Override protected boolean removeEldestEntry(Map.Entry>> e) { boolean rmv = size() > maxIterCnt; @@ -1625,22 +1859,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte futs = old; } - return queryResult(futs, qryInfo, taskName); - } - - /** - * @param futs Futures map. - * @param qryInfo Info. - * @return Iterator. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings({ - "SynchronizationOnLocalVariableOrMethodParameter", - "NonPrivateFieldAccessedInSynchronizedContext"}) - private QueryResult queryResult(Map>> futs, - GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { assert futs != null; - assert qryInfo != null; GridFutureAdapter> fut; @@ -2598,17 +2817,23 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** */ private boolean keepBinary; + /** */ + private boolean locNode; + /** * @param filter Filter. * @param keepBinary Keep binary flag. + * @param locNode Local node. */ private OffheapIteratorClosure( @Nullable IgniteBiPredicate filter, - boolean keepBinary) { + boolean keepBinary, + boolean locNode) { assert filter != null; this.filter = filter; this.keepBinary = keepBinary; + this.locNode = locNode; } /** {@inheritDoc} */ @@ -2623,15 +2848,19 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte if (!filter.apply(key, val)) return null; - if (key instanceof CacheObject) - ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); + if (locNode) + return new IgniteBiTuple<>(key, val); + else{ + if (key instanceof CacheObject) + ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); - val = (V)cctx.unwrapTemporary(e.value()); + val = (V)cctx.unwrapTemporary(e.value()); - if (val instanceof CacheObject) - ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext()); + if (val instanceof CacheObject) + ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext()); - return new IgniteBiTuple<>(e.key(), val); + return new IgniteBiTuple<>(key, val); + } } } @@ -3112,6 +3341,9 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** */ private final IgniteBiPredicate keyValFilter; + /** */ + private boolean locNode; + /** Heap only flag. */ private boolean heapOnly; @@ -3133,6 +3365,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @param topVer Topology version. * @param keyValFilter Key-value filter. * @param keepBinary Keep binary flag from the query. + * @param locNode Local node. + * @param heapOnly Heap only. */ private PeekValueExpiryAwareIterator( Iterator keyIt, @@ -3140,12 +3374,14 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte AffinityTopologyVersion topVer, IgniteBiPredicate keyValFilter, boolean keepBinary, + boolean locNode, boolean heapOnly ) { this.keyIt = keyIt; this.plc = plc; this.topVer = topVer; this.keyValFilter = keyValFilter; + this.locNode = locNode; this.heapOnly = heapOnly; dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); @@ -3209,11 +3445,27 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } if (val != null) { + boolean keepBinary0 = !locNode || keepBinary; + next0 = F.t( - (K)cctx.unwrapBinaryIfNeeded(key, true), - (V)cctx.unwrapBinaryIfNeeded(val, true)); + (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0), + (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0)); + + boolean passPred = true; - if (checkPredicate(next0)) + if (keyValFilter != null) { + Object key0 = next0.getKey(); + Object val0 = next0.getValue(); + + if (keepBinary0 && !keepBinary) { + key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary); + val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary); + } + + passPred = keyValFilter.apply((K)key0, (V)val0); + } + + if (passPred) break; else next0 = null; @@ -3260,21 +3512,5 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } } } - - /** - * Check key-value predicate. - * - * @param e Entry to check. - * @return Filter evaluation result. - */ - private boolean checkPredicate(Map.Entry e) { - if (keyValFilter != null) { - Map.Entry e0 = (Map.Entry)cctx.unwrapBinaryIfNeeded(e, keepBinary); - - return keyValFilter.apply(e0.getKey(), e0.getValue()); - } - - return true; - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f25e361..2f9f9d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -389,7 +389,7 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite CacheQueryFuture> fut = qry.execute(); - CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator it = ctx.itHolder().iterator(fut, new CacheIteratorConverter>() { @Override protected T convert(Map.Entry e) { return e.getKey(); @@ -627,4 +627,4 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite setName = U.readString(in); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 1e1182f..b5634e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -1098,35 +1099,41 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") private Iterator> serviceEntries(IgniteBiPredicate p) { - if (!cache.context().affinityNode()) { - ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + try { + if (!cache.context().affinityNode()) { + ClusterNode oldestSrvNode = + CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); if (oldestSrvNode == null) return F.emptyIterator(); - GridCacheQueryManager qryMgr = cache.context().queries(); + GridCacheQueryManager qryMgr = cache.context().queries(); - CacheQuery> qry = qryMgr.createScanQuery(p, null, false); + CacheQuery> qry = qryMgr.createScanQuery(p, null, false); - qry.keepAll(false); + qry.keepAll(false); - qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); + qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); - return cache.context().itHolder().iterator(qry.execute(), - new CacheIteratorConverter>() { - @Override protected Object convert(Map.Entry e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } + GridCloseableIterator> iter = qry.executeScanQuery(); - @Override protected void remove(Object item) { - throw new UnsupportedOperationException(); - } - } - ); + return cache.context().itHolder().iterator(iter, + new CacheIteratorConverter, Map.Entry>() { + @Override protected Cache.Entry convert(Map.Entry e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Cache.Entry item) { + throw new UnsupportedOperationException(); + } + }); + } + else + return cache.entrySetx().iterator(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - else - return cache.entrySetx().iterator(); } /**