Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 757E7200D0E for ; Tue, 26 Sep 2017 12:48:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 741811609B4; Tue, 26 Sep 2017 10:48:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC261160BD5 for ; Tue, 26 Sep 2017 12:48:18 +0200 (CEST) Received: (qmail 54142 invoked by uid 500); 26 Sep 2017 10:48:18 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 53123 invoked by uid 99); 26 Sep 2017 10:48:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Sep 2017 10:48:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A9B11F5B3F; Tue, 26 Sep 2017 10:48:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Tue, 26 Sep 2017 10:48:56 -0000 Message-Id: In-Reply-To: <837a417004ee422c836e9b3a32230ff0@git.apache.org> References: <837a417004ee422c836e9b3a32230ff0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] ignite git commit: IGNITE-6101 Try to improve local scans performance archived-at: Tue, 26 Sep 2017 10:48:23 -0000 IGNITE-6101 Try to improve local scans performance (cherry picked from commit 8d5e73c) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23104171 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23104171 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23104171 Branch: refs/heads/ignite-gg-12822 Commit: 2310417165d7fd663ff668359ee9a46c775feebf Parents: 9f5c9e3 Author: Igor Seliverstov Authored: Thu Sep 21 19:45:22 2017 +0300 Committer: Dmitriy Govorukhin Committed: Fri Sep 22 15:29:25 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/binary/BinaryUtils.java | 26 ++ .../processors/cache/CacheObjectUtils.java | 63 ++- .../processors/cache/GridCacheAdapter.java | 6 +- .../processors/cache/GridCacheEntryEx.java | 10 + .../processors/cache/GridCacheMapEntry.java | 27 +- .../processors/cache/IgniteCacheProxyImpl.java | 26 +- .../colocated/GridDhtDetachedCacheEntry.java | 4 +- .../distributed/near/GridNearCacheEntry.java | 4 +- .../processors/cache/query/CacheQueryEntry.java | 58 +++ .../query/GridCacheDistributedQueryManager.java | 16 +- .../cache/query/GridCacheQueryAdapter.java | 53 ++- .../cache/query/GridCacheQueryManager.java | 466 +++++++++---------- .../IgniteCacheObjectProcessorImpl.java | 164 ------- .../UserCacheObjectByteArrayImpl.java | 59 +++ .../cacheobject/UserCacheObjectImpl.java | 82 ++++ .../cacheobject/UserKeyCacheObjectImpl.java | 101 ++++ .../service/GridServiceProcessor.java | 6 +- .../resources/META-INF/classnames.properties | 22 +- .../processors/cache/GridCacheTestEntryEx.java | 6 + .../GridCacheQueryTransformerSelfTest.java | 41 ++ 20 files changed, 744 insertions(+), 496 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java index 969f3e1..2e0db93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java @@ -59,6 +59,12 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.internal.binary.builder.BinaryLazyValue; import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl; +import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl; +import org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -701,6 +707,26 @@ public class BinaryUtils { } /** + * @param obj Object to check. + * @return True if this is an object of a known type. + */ + public static boolean knownCacheObject(Object obj) { + if (obj == null) + return false; + + Class cls= obj.getClass(); + + return cls == KeyCacheObjectImpl.class || + cls == BinaryObjectImpl.class || + cls == CacheObjectImpl.class || + cls == CacheObjectByteArrayImpl.class || + cls == BinaryEnumObjectImpl.class || + cls == UserKeyCacheObjectImpl.class || + cls == UserCacheObjectImpl.class || + cls == UserCacheObjectByteArrayImpl.class; + } + + /** * @param arr Array to check. * @return {@code true} if this array is of a known type. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java index f9c76df..e942924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java @@ -17,12 +17,11 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.binary.BinaryUtils; -import org.apache.ignite.internal.util.typedef.F; - import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.util.typedef.F; /** * Cache object utility methods. @@ -34,10 +33,35 @@ public class CacheObjectUtils { * @param cpy Copy value flag. * @return Unwrapped object. */ + public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, CacheObject o, boolean keepBinary, boolean cpy) { + return unwrapBinary(ctx, o, keepBinary, cpy); + } + + /** + * @param o Object to unwrap. + * @param keepBinary Keep binary flag. + * @param cpy Copy value flag. + * @return Unwrapped object. + */ public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) { if (o == null) return null; + // TODO has to be overloaded + if (o instanceof Map.Entry) { + Map.Entry entry = (Map.Entry)o; + + Object key = entry.getKey(); + + Object uKey = unwrapBinary(ctx, key, keepBinary, cpy); + + Object val = entry.getValue(); + + Object uVal = unwrapBinary(ctx, val, keepBinary, cpy); + + return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o; + } + return unwrapBinary(ctx, o, keepBinary, cpy); } @@ -84,7 +108,10 @@ public class CacheObjectUtils { Map map0 = BinaryUtils.newMap(map); for (Map.Entry e : map.entrySet()) - map0.put(unwrapBinary(ctx, e.getKey(), false, cpy), unwrapBinary(ctx, e.getValue(), false, cpy)); + // TODO why don't we use keepBinary parameter here? + map0.put( + unwrapBinary(ctx, e.getKey(), false, cpy), + unwrapBinary(ctx, e.getValue(), false, cpy)); return map0; } @@ -103,7 +130,7 @@ public class CacheObjectUtils { col0 = new ArrayList<>(col.size()); for (Object obj : col) - col0.add(unwrapBinary(ctx, obj, keepBinary, cpy)); + col0.add(unwrapBinaryIfNeeded(ctx, obj, keepBinary, cpy)); return col0; } @@ -135,31 +162,25 @@ public class CacheObjectUtils { */ @SuppressWarnings("unchecked") private static Object unwrapBinary(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) { - if (o instanceof Map.Entry) { - Map.Entry entry = (Map.Entry)o; - - Object key = entry.getKey(); - - Object uKey = unwrapBinary(ctx, key, keepBinary, cpy); + if (o == null) + return o; - Object val = entry.getValue(); + while (BinaryUtils.knownCacheObject(o)) { + CacheObject co = (CacheObject)o; - Object uVal = unwrapBinary(ctx, val, keepBinary, cpy); + if (!co.isPlatformType() && keepBinary) + return o; - return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o; + // It may be a collection of binaries + o = co.value(ctx, cpy); } - else if (BinaryUtils.knownCollection(o)) + + if (BinaryUtils.knownCollection(o)) return unwrapKnownCollection(ctx, (Collection)o, keepBinary, cpy); else if (BinaryUtils.knownMap(o)) return unwrapBinariesIfNeeded(ctx, (Map)o, keepBinary, cpy); else if (o instanceof Object[]) return unwrapBinariesInArrayIfNeeded(ctx, (Object[])o, keepBinary, cpy); - else if (o instanceof CacheObject) { - CacheObject co = (CacheObject)o; - - if (!keepBinary || co.isPlatformType()) - return unwrapBinary(ctx, co.value(ctx, cpy), keepBinary, cpy); - } return o; } http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 40f2b49..92a8245 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -83,13 +83,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; @@ -3911,7 +3911,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache, Map.Entry>() { @Override protected Cache.Entry convert(Map.Entry e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); + // Actually Scan Query returns Iterator by default, + // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces. + return (Cache.Entry) e; } @Override protected void remove(Cache.Entry item) { http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index e2bc7ff..b2cabac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -934,6 +935,15 @@ public interface GridCacheEntryEx { throws IgniteCheckedException, GridCacheEntryRemovedException; /** + * @param row Already extracted value. + * @return Value. + * @throws IgniteCheckedException If failed to read from swap storage. + * @throws GridCacheEntryRemovedException If entry was removed. + */ + @Nullable public CacheObject unswap(CacheDataRow row) + throws IgniteCheckedException, GridCacheEntryRemovedException; + + /** * Unswap ignoring flags. * * @param needVal If {@code false} then do not need to deserialize value during unswap. http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 765dff5..958f156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -342,9 +342,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ + @Override public final CacheObject unswap(CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException { + row = unswap(row, true); + + return row != null ? row.value() : null; + } + + /** {@inheritDoc} */ @Nullable @Override public final CacheObject unswap(boolean needVal) throws IgniteCheckedException, GridCacheEntryRemovedException { - CacheDataRow row = unswap(needVal, true); + CacheDataRow row = unswap(null, true); return row != null ? row.value() : null; } @@ -352,13 +359,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * Unswaps an entry. * - * @param needVal If {@code false} then do not to deserialize value during unswap. + * @param row Already extracted cache data. * @param checkExpire If {@code true} checks for expiration, as result entry can be obsoleted or marked deleted. * @return Value. * @throws IgniteCheckedException If failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable protected CacheDataRow unswap(boolean needVal, boolean checkExpire) + @Nullable protected CacheDataRow unswap(@Nullable CacheDataRow row, boolean checkExpire) throws IgniteCheckedException, GridCacheEntryRemovedException { boolean obsolete = false; boolean deferred = false; @@ -368,7 +375,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) { - CacheDataRow read = cctx.offheap().read(this); + assert row == null || row.key() == key: "Unexpected row key"; + + CacheDataRow read = row == null ? cctx.offheap().read(this) : row; flags |= IS_UNSWAPPED_MASK; @@ -572,7 +581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (val == null) { if (isStartVersion()) { - unswap(true, false); + unswap(null, false); val = this.val; } @@ -1322,7 +1331,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Load and remove from swap if it is new. if (isNew()) - oldRow = unswap(retval, false); + oldRow = unswap(null, false); old = val; @@ -2411,7 +2420,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return null; if (val == null && offheap) - unswap(true, false); + unswap(null, false); if (checkExpired()) { if (cctx.deferredDelete()) { @@ -2648,7 +2657,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean isNew = isStartVersion(); if (isNew) - unswap(true, false); + unswap(null, false); CacheObject val = this.val; @@ -2952,7 +2961,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); if (isStartVersion()) - unswap(true, false); + unswap(null, false); long expireTime = expireTimeExtras(); http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 337c1bb..3056361 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -375,31 +375,7 @@ public class IgniteCacheProxyImpl extends AsyncSupportAdapter iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, ctx.name(), ctx, new IgniteOutClosureX>() { @Override public GridCloseableIterator applyx() throws IgniteCheckedException { - final GridCloseableIterator iter0 = qry.executeScanQuery(); - - final boolean needToConvert = transformer == null; - - return new GridCloseableIteratorAdapter() { - @Override protected R onNext() throws IgniteCheckedException { - Object next = iter0.nextX(); - - if (needToConvert) { - Map.Entry entry = (Map.Entry)next; - - return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue()); - } - - return (R)next; - } - - @Override protected boolean onHasNext() throws IgniteCheckedException { - return iter0.hasNextX(); - } - - @Override protected void onClose() throws IgniteCheckedException { - iter0.close(); - } - }; + return qry.executeScanQuery(); } }, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 7da3d4f..5566bb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -22,8 +22,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -53,7 +53,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException { + @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 6e606bf..ce728b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -30,9 +30,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvcc; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; @@ -443,7 +443,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) { + @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java new file mode 100644 index 0000000..4787464 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import javax.cache.Cache; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.processors.cache.CacheEntryImplEx; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** */ +final class CacheQueryEntry extends IgniteBiTuple implements Cache.Entry { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public CacheQueryEntry() { + // No-op. + } + + /** + * @param key Key. + * @param val Value. + */ + CacheQueryEntry(@Nullable K key, @Nullable V val) { + super(key, val); + } + + /** {@inheritDoc} */ + @Override public T unwrap(Class cls) { + if (cls != null && cls.isAssignableFrom(getClass())) + return cls.cast(this); + + if (cls.isAssignableFrom(CacheEntryImpl.class)) + return (T)new CacheEntryImpl<>(getKey(), getValue()); + + if (cls.isAssignableFrom(CacheEntry.class)) + return (T)new CacheEntryImplEx<>(getKey(), getValue(), null); + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 7f859a2..b860f02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -632,7 +631,20 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage if (locIter != null && locIter.hasNextX()) cur = locIter.nextX(); - return cur != null || (cur = fut.next()) != null; + return cur != null || (cur = convert(fut.next())) != null; + } + + /** + * @param obj Entry to convert. + * @return Cache entry + */ + private Object convert(Object obj) { + if(qry.transform() != null) + return obj; + + Map.Entry e = (Map.Entry)obj; + + return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue()); } @Override protected void onClose() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 023c03c..c4eae8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.query; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; @@ -517,7 +518,8 @@ public class GridCacheQueryAdapter implements CacheQuery { @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { assert type == SCAN : "Wrong processing of qyery: " + type; - Collection nodes = nodes(); + // Affinity nodes snapshot. + Collection nodes = new ArrayList<>(nodes()); cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -537,13 +539,15 @@ public class GridCacheQueryAdapter implements CacheQuery { final GridCacheQueryManager qryMgr = cctx.queries(); - if (part != null && !cctx.isLocal()) - return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); - else { - boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); + boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); - return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes); - } + if (loc) + return qryMgr.scanQueryLocal(this, true); + + if (part != null) + return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); + else + return qryMgr.scanQueryDistributed(this, nodes); } /** @@ -621,12 +625,12 @@ public class GridCacheQueryAdapter implements CacheQuery { /** * Wrapper for queries with fallback. */ - private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter { + private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter { /** */ private static final long serialVersionUID = 0L; /** Query future. */ - private volatile T2, GridCacheQueryFutureAdapter> tuple; + private volatile T2, GridCacheQueryFutureAdapter> tuple; /** Backups. */ private volatile Queue nodes; @@ -653,7 +657,7 @@ public class GridCacheQueryAdapter implements CacheQuery { private boolean firstItemReturned; /** */ - private Map.Entry cur; + private Object cur; /** * @param part Partition. @@ -726,7 +730,7 @@ public class GridCacheQueryAdapter implements CacheQuery { } } else { - final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.transform, null); GridCacheQueryFutureAdapter fut = (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node)); @@ -736,13 +740,13 @@ public class GridCacheQueryAdapter implements CacheQuery { } /** {@inheritDoc} */ - @Override protected Map.Entry onNext() throws IgniteCheckedException { + @Override protected Object onNext() throws IgniteCheckedException { if (!onHasNext()) throw new NoSuchElementException(); assert cur != null; - Map.Entry e = cur; + Object e = cur; cur = null; @@ -755,9 +759,9 @@ public class GridCacheQueryAdapter implements CacheQuery { if (cur != null) return true; - T2, GridCacheQueryFutureAdapter> t = tuple; + T2, GridCacheQueryFutureAdapter> t = tuple; - GridCloseableIterator iter = t.get1(); + GridCloseableIterator iter = t.get1(); if (iter != null) { boolean hasNext = iter.hasNext(); @@ -773,14 +777,14 @@ public class GridCacheQueryAdapter implements CacheQuery { assert fut != null; if (firstItemReturned) - return (cur = (Map.Entry)fut.next()) != null; + return (cur = convert(fut.next())) != null; try { fut.awaitFirstPage(); firstItemReturned = true; - return (cur = (Map.Entry)fut.next()) != null; + return (cur = convert(fut.next())) != null; } catch (IgniteClientDisconnectedCheckedException e) { throw CU.convertToCacheException(e); @@ -793,6 +797,19 @@ public class GridCacheQueryAdapter implements CacheQuery { } /** + * @param obj Entry to convert. + * @return Cache entry + */ + private Object convert(Object obj) { + if(qry.transform() != null) + return obj; + + Map.Entry e = (Map.Entry)obj; + + return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue()); + } + + /** * @param e Exception for query run. */ private void retryIfPossible(IgniteCheckedException e) { @@ -847,7 +864,7 @@ public class GridCacheQueryAdapter implements CacheQuery { @Override protected void onClose() throws IgniteCheckedException { super.onClose(); - T2, GridCacheQueryFutureAdapter> t = tuple; + T2, GridCacheQueryFutureAdapter> t = tuple; if (t != null && t.get1() != null) t.get1().close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 4307d26..05e8a20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -40,10 +40,10 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; -import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.QueryIndexType; import org.apache.ignite.cache.query.QueryMetrics; @@ -63,7 +63,9 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheInternal; @@ -71,10 +73,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate; import org.apache.ignite.internal.processors.datastructures.SetItemKey; @@ -817,22 +819,22 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @throws IgniteCheckedException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator> scanIterator(final GridCacheQueryAdapter qry, boolean locNode) + private GridCloseableIterator scanIterator(final GridCacheQueryAdapter qry, boolean locNode) throws IgniteCheckedException { final IgniteBiPredicate keyValFilter = qry.scanFilter(); try { injectResources(keyValFilter); - Integer part = qry.partition(); - - if (cctx.isLocal()) - part = null; + Integer part = cctx.isLocal() ? null : qry.partition(); if (part != null && (part < 0 || part >= cctx.affinity().partitions())) - return new GridEmptyCloseableIterator<>(); - - final ExpiryPolicy plc = cctx.expiry(); + return new GridEmptyCloseableIterator() { + @Override public void close() throws IgniteCheckedException { + closeScanFilter(keyValFilter); + super.close(); + } + }; AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion(); @@ -854,13 +856,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); - if (locPart0.state() != OWNING) { - locPart0.release(); - - throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), - "Partition can not be reserved"); - } - locPart = locPart0; it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part); @@ -868,19 +863,11 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte else { locPart = null; + // TODO shouldn't we reserve all involved partitions? it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer); } - return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) { - @Override protected void onClose() { - super.onClose(); - - if (locPart != null) - locPart.release(); - - closeScanFilter(keyValFilter); - } - }; + return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, locNode, cctx, log); } catch (IgniteCheckedException | RuntimeException e) { closeScanFilter(keyValFilter); @@ -1185,9 +1172,16 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - while (!Thread.currentThread().isInterrupted() && iter.hasNext()) { + CacheObjectContext objCtx = cctx.cacheObjectContext(); + + while (!Thread.currentThread().isInterrupted()) { long start = statsEnabled ? System.nanoTime() : 0L; + // Need to call it after gathering start time because + // actual row extracting may happen inside this method. + if(!iter.hasNext()) + break; + IgniteBiTuple row = iter.next(); // Query is cancelled. @@ -1245,8 +1239,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte V val0 = null; if (readEvt) { - key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); - val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false); + val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false); switch (type) { case SQL: @@ -1316,9 +1310,9 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte if (rdc != null || trans != null) { if (key0 == null) - key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); + key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false); if (val0 == null) - val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false); Cache.Entry entry = new CacheEntryImpl(key0, val0); @@ -1406,22 +1400,24 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * Process local scan query. * * @param qry Query. - * @param updStatisticsIfNeeded Update statistics flag. + * @param updateStatistics Update statistics flag. */ @SuppressWarnings({"unchecked", "serial"}) protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry, - final boolean updStatisticsIfNeeded) throws IgniteCheckedException { + boolean updateStatistics) throws IgniteCheckedException { if (!enterBusy()) throw new IllegalStateException("Failed to process query request (grid is stopping)."); final boolean statsEnabled = cctx.config().isStatisticsEnabled(); - boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled; + updateStatistics &= statsEnabled; long startTime = U.currentTimeMillis(); final String namex = cctx.name(); + final IgniteBiPredicate scanFilter = qry.scanFilter(); + try { assert qry.type() == SCAN; @@ -1429,7 +1425,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte log.debug("Running local SCAN query: " + qry); final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); - final IgniteBiPredicate filter = qry.scanFilter(); final ClusterNode locNode = cctx.localNode(); final UUID subjId = qry.subjectId(); @@ -1442,80 +1437,23 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte namex, null, null, - filter, + scanFilter, null, null, subjId, taskName)); } - final GridCloseableIterator> iter = scanIterator(qry, true); + GridCloseableIterator it = scanIterator(qry, true); - if (updStatisticsIfNeeded) - needUpdStatistics = false; - - final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - return new GridCloseableIteratorAdapter() { - @Override protected Object onNext() throws IgniteCheckedException { - long start = statsEnabled ? System.nanoTime() : 0L; + updateStatistics = false; - IgniteBiTuple next = iter.nextX(); - - if (statsEnabled) { - CacheMetricsImpl metrics = cctx.cache().metrics0(); - - metrics.onRead(true); - - metrics.addGetTimeNanos(System.nanoTime() - start); - } - - if (readEvt) { - cctx.gridEvents().record(new CacheQueryReadEvent<>( - cctx.localNode(), - "Scan query entry read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SCAN.name(), - namex, - null, - null, - filter, - null, - null, - subjId, - taskName, - next.getKey(), - next.getValue(), - null, - null)); - } - - IgniteClosure transform = qry.transform(); - - if (transform == null) - return next; - - Cache.Entry entry; - - if (qry.keepBinary()) - entry = cctx.cache().keepBinary().getEntry(next.getKey()); - else - entry = cctx.cache().getEntry(next.getKey()); - - return transform.apply(entry); - } - - @Override protected boolean onHasNext() throws IgniteCheckedException { - return iter.hasNextX(); - } - - @Override protected void onClose() throws IgniteCheckedException { - iter.close(); - } - }; + return it; } catch (Exception e) { - if (needUpdStatistics) + closeScanFilter(scanFilter); + + if (updateStatistics) cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime, U.currentTimeMillis() - startTime, true); @@ -2031,8 +1969,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte private static final long serialVersionUID = 0L; /** - * Number of fields to report when no fields defined. - * Includes _key and _val columns. + * Number of fields to report when no fields defined. Includes _key and _val columns. */ private static final int NO_FIELDS_COLUMNS_COUNT = 2; @@ -2846,14 +2783,68 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** - * + * The map prevents put to the map in case the specified request has been removed previously. */ - private class PeekValueExpiryAwareIterator extends GridCloseableIteratorAdapter> { + private class RequestFutureMap extends LinkedHashMap>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Count of canceled keys */ + private static final int CANCELED_COUNT = 128; + + /** + * The ID of the canceled request is stored to the set in case remove(reqId) is called before put(reqId, + * future). + */ + private Set canceled; + + /** {@inheritDoc} */ + @Override public GridFutureAdapter> remove(Object key) { + if (containsKey(key)) + return super.remove(key); + else { + if (canceled == null) { + canceled = Collections.newSetFromMap( + new LinkedHashMap() { + @Override protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > CANCELED_COUNT; + } + }); + } + + canceled.add((Long)key); + + return null; + } + } + + /** + * @return true if the key is canceled + */ + boolean isCanceled(Long key) { + return canceled != null && canceled.contains(key); + } + } + + /** */ + private static final class ScanQueryIterator extends GridCloseableIteratorAdapter { /** */ private static final long serialVersionUID = 0L; /** */ - private final ExpiryPolicy plc; + private final GridDhtCacheAdapter dht; + + /** */ + private final GridDhtLocalPartition locPart; + + /** */ + private final IgniteBiPredicate scanFilter; + + /** */ + private final boolean statsEnabled; + + /** */ + private final GridIterator it; /** */ private final GridCacheAdapter cache; @@ -2862,73 +2853,94 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte private final AffinityTopologyVersion topVer; /** */ - private final GridDhtCacheAdapter dht; + private final boolean keepBinary; /** */ - private final IgniteBiPredicate keyValFilter; + private final boolean readEvt; /** */ - private boolean locNode; + private final String cacheName; /** */ - private final boolean keepBinary; + private final UUID subjId; /** */ - private IgniteBiTuple next; + private final String taskName; /** */ - private IgniteCacheExpiryPolicy expiryPlc; + private final IgniteClosure transform; + + /** */ + private final CacheObjectContext objCtx; + + /** */ + private final GridCacheContext cctx; + + /** */ + private final IgniteLogger log; /** */ - private GridIterator it; + private Object next; - /** Need advance. */ + /** */ private boolean needAdvance; + /** */ + private IgniteCacheExpiryPolicy expiryPlc; + /** * @param it Iterator. - * @param plc Expiry policy. + * @param qry Query. * @param topVer Topology version. - * @param keyValFilter Key-value filter. - * @param keepBinary Keep binary flag from the query. - * @param locNode Local node. + * @param locPart Local partition. + * @param scanFilter Scan filter. + * @param locNode Local node flag. + * @param cctx Cache context. + * @param log Logger. */ - private PeekValueExpiryAwareIterator( + ScanQueryIterator( GridIterator it, - ExpiryPolicy plc, + GridCacheQueryAdapter qry, AffinityTopologyVersion topVer, - IgniteBiPredicate keyValFilter, - boolean keepBinary, - boolean locNode - ) { + GridDhtLocalPartition locPart, + IgniteBiPredicate scanFilter, + boolean locNode, + GridCacheContext cctx, + IgniteLogger log) { this.it = it; - this.plc = plc; this.topVer = topVer; - this.keyValFilter = keyValFilter; - this.locNode = locNode; + this.locPart = locPart; + this.scanFilter = scanFilter; + this.cctx = cctx; + this.log = log; - dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); - cache = dht != null ? dht : cctx.cache(); + statsEnabled = locNode && cctx.config().isStatisticsEnabled(); - this.keepBinary = keepBinary; - expiryPlc = cctx.cache().expiryPolicy(plc); + readEvt = locNode && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - needAdvance = true; - } - - /** {@inheritDoc} */ - @Override public boolean onHasNext() { - if (needAdvance) { - advance(); - - needAdvance = false; + if(readEvt){ + taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); + subjId = qry.subjectId(); + } + else { + taskName = null; + subjId = null; } - return next != null; + // keep binary for remote scans if possible + keepBinary = (!locNode && scanFilter == null) || qry.keepBinary(); + transform = qry.transform(); + dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); + cache = dht != null ? dht : cctx.cache(); + objCtx = cctx.cacheObjectContext(); + cacheName = cctx.name(); + + needAdvance = true; + expiryPlc = this.cctx.cache().expiryPolicy(null); } /** {@inheritDoc} */ - @Override public IgniteBiTuple onNext() { + @Override protected Object onNext() { if (needAdvance) advance(); else @@ -2941,26 +2953,64 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** {@inheritDoc} */ + @Override protected boolean onHasNext() { + if (needAdvance) { + advance(); + + needAdvance = false; + } + + return next != null; + } + + /** {@inheritDoc} */ @Override protected void onClose() { - sendTtlUpdate(); + if (expiryPlc != null && dht != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + + if (locPart != null) + locPart.release(); + + closeScanFilter(scanFilter); } /** * Moves the iterator to the next cache entry. */ private void advance() { - IgniteBiTuple next0 = null; + long start = statsEnabled ? System.nanoTime() : 0L; + + Object next = null; while (it.hasNext()) { CacheDataRow row = it.next(); KeyCacheObject key = row.key(); - CacheObject val; if (expiryPlc != null) { try { - val = value(key); + CacheDataRow tmp = row; + + while (true) { + try { + GridCacheEntryEx entry = cache.entryEx(key); + + entry.unswap(tmp); + + val = entry.peek(true, true, topVer, expiryPlc); + + cctx.evicts().touch(entry, topVer); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + tmp = null; + } + } } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -2969,122 +3019,58 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte val = null; } - if (dht != null && expiryPlc.readyToFlush(100)) { + if (dht != null && expiryPlc.readyToFlush(100)) dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = cctx.cache().expiryPolicy(plc); - } } else val = row.value(); if (val != null) { - boolean keepBinary0 = !locNode || keepBinary; + K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false); + V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false); - next0 = F.t( - (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0), - (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0)); + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); - boolean passPred = true; + metrics.onRead(true); - if (keyValFilter != null) { - Object key0 = next0.getKey(); - Object val0 = next0.getValue(); + metrics.addGetTimeNanos(System.nanoTime() - start); + } - if (keepBinary0 && !keepBinary) { - key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary); - val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary); + if (scanFilter == null || scanFilter.apply(key0, val0)) { + if (readEvt && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Scan query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SCAN.name(), + cacheName, + null, + null, + scanFilter, + null, + null, + subjId, + taskName, + key0, + val0, + null, + null)); } - passPred = keyValFilter.apply((K)key0, (V)val0); - } + next = transform == null ? new CacheQueryEntry<>(key0, val0) + : transform.apply(new CacheQueryEntry<>(key0, val0)); - if (passPred) break; - else - next0 = null; + } } } - next = next0; - - if (next == null) - sendTtlUpdate(); - } - - /** - * Sends TTL update. - */ - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { + if ((this.next = next) == null && expiryPlc != null && dht != null) { dht.sendTtlUpdateRequest(expiryPlc); expiryPlc = null; } } - - /** - * @param key Key. - * @return Value. - * @throws IgniteCheckedException If failed to peek value. - */ - private CacheObject value(KeyCacheObject key) throws IgniteCheckedException { - while (true) { - try { - GridCacheEntryEx entry = cache.entryEx(key); - - entry.unswap(); - - return entry.peek(true, true, topVer, expiryPlc); - } - catch (GridCacheEntryRemovedException ignore) { - // No-op. - } - } - } - } - - /** - * The map prevents put to the map in case the specified request has been removed previously. - */ - private class RequestFutureMap extends LinkedHashMap>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Count of canceled keys */ - private static final int CANCELED_COUNT = 128; - - /** - * The ID of the canceled request is stored to the set in case - * remove(reqId) is called before put(reqId, future). - */ - private Set canceled; - - /** {@inheritDoc} */ - @Override public GridFutureAdapter> remove(Object key) { - if (containsKey(key)) - return super.remove(key); - else { - if (canceled == null) { - canceled = Collections.newSetFromMap( - new LinkedHashMap() { - @Override protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > CANCELED_COUNT; - } - }); - } - - canceled.add((Long)key); - - return null; - } - } - - /** - * @return true if the key is canceled - */ - boolean isCanceled(Long key) { - return canceled != null && canceled.contains(key); - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 70711e5..17be90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cacheobject; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.UUID; @@ -40,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -341,166 +339,4 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme return false; } - /** - * Wraps key provided by user, must be serialized before stored in cache. - */ - private static class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { - /** */ - private static final long serialVersionUID = 0L; - - /** - * - */ - public UserKeyCacheObjectImpl() { - //No-op. - } - - /** - * @param key Key. - * @param part Partition. - */ - UserKeyCacheObjectImpl(Object key, int part) { - super(key, null, part); - } - - /** - * @param key Key. - * @param valBytes Marshalled key. - * @param part Partition. - */ - UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) { - super(key, valBytes, part); - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject copy(int part) { - if (this.partition() == part) - return this; - - return new UserKeyCacheObjectImpl(val, valBytes, part); - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - try { - IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects(); - - if (!proc.immutable(val)) { - if (valBytes == null) - valBytes = proc.marshal(ctx, val); - - boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled(); - - ClassLoader ldr = p2pEnabled ? - IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader(); - - Object val = proc.unmarshal(ctx, valBytes, ldr); - - KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); - - key.partition(partition()); - - return key; - } - - KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); - - key.partition(partition()); - - return key; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); - } - } - } - - /** - * Wraps value provided by user, must be serialized before stored in cache. - */ - private static class UserCacheObjectImpl extends CacheObjectImpl { - /** */ - private static final long serialVersionUID = 0L; - - /** - * - */ - public UserCacheObjectImpl() { - //No-op. - } - - /** - * @param val Value. - * @param valBytes Value bytes. - */ - public UserCacheObjectImpl(Object val, byte[] valBytes) { - super(val, valBytes); - } - - /** {@inheritDoc} */ - @Nullable @Override public T value(CacheObjectValueContext ctx, boolean cpy) { - return super.value(ctx, false); // Do not need copy since user value is not in cache. - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - try { - IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects(); - - if (valBytes == null) - valBytes = proc.marshal(ctx, val); - - if (ctx.storeValue()) { - boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled(); - - ClassLoader ldr = p2pEnabled ? - IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); - - Object val = this.val != null && proc.immutable(this.val) ? this.val : - proc.unmarshal(ctx, valBytes, ldr); - - return new CacheObjectImpl(val, valBytes); - } - - return new CacheObjectImpl(null, valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); - } - } - } - - /** - * Wraps value provided by user, must be copied before stored in cache. - */ - private static class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl { - /** */ - private static final long serialVersionUID = 0L; - - /** - * - */ - public UserCacheObjectByteArrayImpl() { - // No-op. - } - - /** - * @param val Value. - */ - public UserCacheObjectByteArrayImpl(byte[] val) { - super(val); - } - - /** {@inheritDoc} */ - @Nullable @Override public T value(CacheObjectValueContext ctx, boolean cpy) { - return super.value(ctx, false); // Do not need copy since user value is not in cache. - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - byte[] valCpy = Arrays.copyOf(val, val.length); - - return new CacheObjectByteArrayImpl(valCpy); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java new file mode 100644 index 0000000..aa4d5f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cacheobject; + +import java.util.Arrays; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.jetbrains.annotations.Nullable; + +/** + * Wraps value provided by user, must be copied before stored in cache. + */ +public class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public UserCacheObjectByteArrayImpl() { + // No-op. + } + + /** + * @param val Value. + */ + public UserCacheObjectByteArrayImpl(byte[] val) { + super(val); + } + + /** {@inheritDoc} */ + @Nullable @Override public T value(CacheObjectValueContext ctx, boolean cpy) { + return super.value(ctx, false); // Do not need copy since user value is not in cache. + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + byte[] valCpy = Arrays.copyOf(val, val.length); + + return new CacheObjectByteArrayImpl(valCpy); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java new file mode 100644 index 0000000..241c12b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cacheobject; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Wraps value provided by user, must be serialized before stored in cache. + */ +public class UserCacheObjectImpl extends CacheObjectImpl { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public UserCacheObjectImpl() { + //No-op. + } + + /** + * @param val Value. + * @param valBytes Value bytes. + */ + public UserCacheObjectImpl(Object val, byte[] valBytes) { + super(val, valBytes); + } + + /** {@inheritDoc} */ + @Nullable @Override public T value(CacheObjectValueContext ctx, boolean cpy) { + return super.value(ctx, false); // Do not need copy since user value is not in cache. + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + try { + IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects(); + + if (valBytes == null) + valBytes = proc.marshal(ctx, val); + + if (ctx.storeValue()) { + boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled(); + + ClassLoader ldr = p2pEnabled ? + IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); + + Object val = this.val != null && proc.immutable(this.val) ? this.val : + proc.unmarshal(ctx, valBytes, ldr); + + return new CacheObjectImpl(val, valBytes); + } + + return new CacheObjectImpl(null, valBytes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java new file mode 100644 index 0000000..de57667 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cacheobject; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Wraps key provided by user, must be serialized before stored in cache. + */ +public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public UserKeyCacheObjectImpl() { + //No-op. + } + + /** + * @param key Key. + * @param part Partition. + */ + UserKeyCacheObjectImpl(Object key, int part) { + super(key, null, part); + } + + /** + * @param key Key. + * @param valBytes Marshalled key. + * @param part Partition. + */ + UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) { + super(key, valBytes, part); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject copy(int part) { + if (this.partition() == part) + return this; + + return new UserKeyCacheObjectImpl(val, valBytes, part); + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + try { + IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects(); + + if (!proc.immutable(val)) { + if (valBytes == null) + valBytes = proc.marshal(ctx, val); + + boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled(); + + ClassLoader ldr = p2pEnabled ? + IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader(); + + Object val = proc.unmarshal(ctx, valBytes, ldr); + + KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); + + key.partition(partition()); + + return key; + } + + KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition()); + + key.partition(partition()); + + return key; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/23104171/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 46fcfea..1d8720c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -61,7 +60,6 @@ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; -import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -1309,7 +1307,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return cache.context().itHolder().iterator(iter, new CacheIteratorConverter, Map.Entry>() { @Override protected Cache.Entry convert(Map.Entry e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); + // Actually Scan Query returns Iterator by default, + // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces. + return (Cache.Entry)e; } @Override protected void remove(Cache.Entry item) {