Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DB06D186C6 for ; Fri, 4 Mar 2016 09:21:21 +0000 (UTC) Received: (qmail 98272 invoked by uid 500); 4 Mar 2016 09:21:21 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 98173 invoked by uid 500); 4 Mar 2016 09:21:21 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98055 invoked by uid 99); 4 Mar 2016 09:21:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 09:21:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3213EE0044; Fri, 4 Mar 2016 09:21:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 04 Mar 2016 09:21:22 -0000 Message-Id: <58c063cb47b14be5b1cdace5a736e595@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created". Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created". Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baa13122 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baa13122 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baa13122 Branch: refs/heads/ignite-atomic-good-lock-bench Commit: baa131220bf503da0908e4ecfee92966317e209c Parents: c13339f Author: nikolay_tikhonov Authored: Thu Mar 3 16:21:53 2016 +0300 Committer: nikolay_tikhonov Committed: Thu Mar 3 16:21:53 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 35 + .../processors/cache/IgniteCacheProxy.java | 4 + .../continuous/CacheContinuousQueryHandler.java | 86 ++- .../CacheContinuousQueryHandlerV2.java | 176 +++++ .../continuous/CacheContinuousQueryManager.java | 238 +++++-- .../continuous/GridContinuousProcessor.java | 7 +- .../IgniteCacheEntryListenerAbstractTest.java | 75 +- .../cache/IgniteCacheEntryListenerTxTest.java | 5 - .../GridCacheReplicatedPreloadSelfTest.java | 39 +- .../CacheContinuousQueryFactoryFilterTest.java | 714 +++++++++++++++++++ ...ContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../CacheContinuousQueryOperationP2PTest.java | 326 +++++++++ ...acheContinuousQueryRandomOperationsTest.java | 63 +- .../p2p/CacheDeploymentEntryEventFilter.java | 33 + .../CacheDeploymentEntryEventFilterFactory.java | 31 + .../IgniteCacheQuerySelfTestSuite.java | 4 + 16 files changed, 1706 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index df1bad3..3ea8f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -18,6 +18,8 @@ package org.apache.ignite.cache.query; import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -119,6 +121,9 @@ public final class ContinuousQuery extends Query> { /** Remote filter. */ private CacheEntryEventSerializableFilter rmtFilter; + /** Remote filter factory. */ + private Factory> rmtFilterFactory; + /** Time interval. */ private long timeInterval = DFLT_TIME_INTERVAL; @@ -196,7 +201,10 @@ public final class ContinuousQuery extends Query> { * * @param rmtFilter Key-value filter. * @return {@code this} for chaining. + * + * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead. */ + @Deprecated public ContinuousQuery setRemoteFilter(CacheEntryEventSerializableFilter rmtFilter) { this.rmtFilter = rmtFilter; @@ -213,6 +221,33 @@ public final class ContinuousQuery extends Query> { } /** + * Sets optional key-value filter factory. This factory produces filter is called before entry is + * sent to the master node. + *

+ * WARNING: all operations that involve any kind of JVM-local or distributed locking + * (e.g., synchronization or transactional cache operations), should be executed asynchronously + * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * + * @param rmtFilterFactory Key-value filter factory. + * @return {@code this} for chaining. + */ + public ContinuousQuery setRemoteFilterFactory( + Factory> rmtFilterFactory) { + this.rmtFilterFactory = rmtFilterFactory; + + return this; + } + + /** + * Gets remote filter. + * + * @return Remote filter. + */ + public Factory> getRemoteFilterFactory() { + return rmtFilterFactory; + } + + /** * Sets time interval. *

* When a cache update happens, entry is first put into a buffer. Entries from buffer will http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 5ed8753..6e8bcbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -565,10 +565,14 @@ public class IgniteCacheProxy extends AsyncSupportAdapter implements GridContinuousHandler * @param topic Topic for ordered messages. * @param locLsnr Local listener. * @param rmtFilter Remote filter. - * @param internal Internal flag. - * @param notifyExisting Notify existing flag. * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired events flag. - * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. - * @param taskHash Task name hash code. - * @param locCache {@code True} if local cache. - * @param keepBinary Keep binary flag. */ public CacheContinuousQueryHandler( String cacheName, Object topic, CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter, - boolean internal, - boolean notifyExisting, boolean oldValRequired, boolean sync, boolean ignoreExpired, - int taskHash, - boolean skipPrimaryCheck, - boolean locCache, - boolean keepBinary, boolean ignoreClsNotFound) { assert topic != null; assert locLsnr != null; @@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler this.topic = topic; this.locLsnr = locLsnr; this.rmtFilter = rmtFilter; - this.internal = internal; - this.notifyExisting = notifyExisting; this.oldValRequired = oldValRequired; this.sync = sync; this.ignoreExpired = ignoreExpired; - this.taskHash = taskHash; - this.skipPrimaryCheck = skipPrimaryCheck; - this.locCache = locCache; - this.keepBinary = keepBinary; this.ignoreClsNotFound = ignoreClsNotFound; cacheId = CU.cacheId(cacheName); } + /** + * @param internal Internal query. + */ + public void internal(boolean internal) { + this.internal = internal; + } + + /** + * @param notifyExisting Notify existing. + */ + public void notifyExisting(boolean notifyExisting) { + this.notifyExisting = notifyExisting; + } + + /** + * @param locCache Local cache. + */ + public void localCache(boolean locCache) { + this.locCache = locCache; + } + + /** + * @param taskHash Task hash. + */ + public void taskNameHash(int taskHash) { + this.taskHash = taskHash; + } + + /** + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + */ + public void skipPrimaryCheck(boolean skipPrimaryCheck) { + this.skipPrimaryCheck = skipPrimaryCheck; + } + /** {@inheritDoc} */ @Override public boolean isEvents() { return false; @@ -262,8 +279,10 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler if (locLsnr != null) ctx.resource().injectGeneric(locLsnr); - if (rmtFilter != null) - ctx.resource().injectGeneric(rmtFilter); + final CacheEntryEventFilter filter = getEventFilter(); + + if (filter != null) + ctx.resource().injectGeneric(filter); entryBufs = new ConcurrentHashMap<>(); @@ -303,7 +322,8 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler null, null, null, - rmtFilter, + filter instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)filter : null, null, nodeId, taskName() @@ -332,9 +352,9 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler boolean notify = !evt.entry().isFiltered(); - if (notify && rmtFilter != null) { + if (notify && filter != null) { try { - notify = rmtFilter.evaluate(evt); + notify = filter.evaluate(evt); } catch (Exception e) { U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e); @@ -422,7 +442,8 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler null, null, null, - rmtFilter, + filter instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)filter : null, null, nodeId, taskName(), @@ -435,8 +456,8 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler } @Override public void onUnregister() { - if (rmtFilter instanceof PlatformContinuousQueryFilter) - ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister(); + if (filter instanceof PlatformContinuousQueryFilter) + ((PlatformContinuousQueryFilter)filter).onQueryUnregister(); } @Override public void cleanupBackupQueue(Map updateCntrs) { @@ -517,6 +538,13 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler } /** + * @return Cache entry event filter. + */ + public CacheEntryEventFilter getEventFilter() { + return rmtFilter; + } + + /** * @param cctx Context. * @param nodeId ID of the node that started routine. * @param entry Entry. @@ -1189,7 +1217,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler /** * Deployable object. */ - private static class DeployableObject implements Externalizable { + protected static class DeployableObject implements Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -1214,7 +1242,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler * @param ctx Kernal context. * @throws IgniteCheckedException In case of error. */ - private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { assert obj != null; assert ctx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java new file mode 100644 index 0000000..7aef4dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; +import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Continuous query handler V2 version. Contains {@link Factory} for remote listener. + */ +public class CacheContinuousQueryHandlerV2 extends CacheContinuousQueryHandler { + /** */ + private static final long serialVersionUID = 0L; + + /** Remote filter factory. */ + private Factory rmtFilterFactory; + + /** Deployable object for filter factory. */ + private DeployableObject rmtFilterFactoryDep; + + /** Event types for JCache API. */ + private byte types; + + /** */ + protected transient CacheEntryEventFilter filter; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryHandlerV2() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param locLsnr Local listener. + * @param rmtFilterFactory Remote filter factory. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired events flag. + * @param types Event types. + */ + public CacheContinuousQueryHandlerV2( + String cacheName, + Object topic, + CacheEntryUpdatedListener locLsnr, + Factory> rmtFilterFactory, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean ignoreClsNotFound, + @Nullable Byte types) { + super(cacheName, + topic, + locLsnr, + null, + oldValRequired, + sync, + ignoreExpired, + ignoreClsNotFound); + + assert rmtFilterFactory != null; + + this.rmtFilterFactory = rmtFilterFactory; + + if (types != null) { + assert types != 0; + + this.types = types; + } + } + + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter getEventFilter() { + if (filter == null) { + assert rmtFilterFactory != null; + + Factory factory = rmtFilterFactory; + + filter = factory.create(); + + if (types != 0) + filter = new JCacheQueryRemoteFilter(filter, types); + } + + return filter; + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + super.p2pMarshal(ctx); + + if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass())) + rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + super.p2pUnmarshal(nodeId, ctx); + + if (rmtFilterFactoryDep != null) + rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @Override public GridContinuousHandler clone() { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryHandlerV2.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + boolean b = rmtFilterFactoryDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(rmtFilterFactoryDep); + else + out.writeObject(rmtFilterFactory); + + out.writeByte(types); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + boolean b = in.readBoolean(); + + if (b) + rmtFilterFactoryDep = (DeployableObject)in.readObject(); + else + rmtFilterFactory = (Factory)in.readObject(); + + types = in.readByte(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 409c1da..353043f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -23,15 +23,16 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import java.util.ArrayList; -import java.util.Map; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; @@ -54,10 +55,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; @@ -70,6 +72,7 @@ import static javax.cache.event.EventType.REMOVED; import static javax.cache.event.EventType.UPDATED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; +import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE; /** * Continuous queries manager. @@ -413,28 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeQuery(CacheEntryUpdatedListener locLsnr, - CacheEntryEventSerializableFilter rmtFilter, + public UUID executeQuery(final CacheEntryUpdatedListener locLsnr, + @Nullable final CacheEntryEventSerializableFilter rmtFilter, + @Nullable final Factory rmtFilterFactory, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean loc, - boolean keepBinary) throws IgniteCheckedException + final boolean keepBinary) throws IgniteCheckedException { + IgniteClosure clsr; + + if (rmtFilterFactory != null) + clsr = new IgniteClosure() { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { + CacheContinuousQueryHandler hnd; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + true, + false, + true, + false, + null); + else { + CacheEntryEventFilter fltr = rmtFilterFactory.create(); + + if (!(fltr instanceof CacheEntryEventSerializableFilter)) + throw new IgniteException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter should implement " + + "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr); + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + (CacheEntryEventSerializableFilter)fltr, + true, + false, + true, + false); + } + + return hnd; + } + }; + else + clsr = new IgniteClosure() { + @Override public CacheContinuousQueryHandler apply(Boolean ignore) { + return new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + true, + false, + true, + false); + } + }; + return executeQuery0( locLsnr, - rmtFilter, + clsr, bufSize, timeInterval, autoUnsubscribe, false, false, - true, - false, - true, loc, - keepBinary, - false); + keepBinary); } /** @@ -445,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeInternalQuery(CacheEntryUpdatedListener locLsnr, - CacheEntryEventSerializableFilter rmtFilter, - boolean loc, - boolean notifyExisting, - boolean ignoreClassNotFound) + public UUID executeInternalQuery(final CacheEntryUpdatedListener locLsnr, + final CacheEntryEventSerializableFilter rmtFilter, + final boolean loc, + final boolean notifyExisting, + final boolean ignoreClassNotFound) throws IgniteCheckedException { return executeQuery0( locLsnr, - rmtFilter, + new IgniteClosure() { + @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) { + return new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + true, + false, + true, + ignoreClassNotFound); + } + }, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, true, notifyExisting, - true, - false, - true, loc, - false, - ignoreClassNotFound); + false); } /** @@ -539,32 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param locLsnr Local listener. - * @param rmtFilter Remote filter. * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. * @param internal Internal flag. * @param notifyExisting Notify existing flag. - * @param oldValRequired Old value required flag. - * @param sync Synchronous flag. - * @param ignoreExpired Ignore expired event flag. * @param loc Local flag. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, - final CacheEntryEventSerializableFilter rmtFilter, + IgniteClosure clsr, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, - boolean oldValRequired, - boolean sync, - boolean ignoreExpired, boolean loc, - final boolean keepBinary, - boolean ignoreClassNotFound) throws IgniteCheckedException + final boolean keepBinary) throws IgniteCheckedException { cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -573,21 +628,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - GridContinuousHandler hnd = new CacheContinuousQueryHandler( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - rmtFilter, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound); + boolean v2 = useV2Protocol(cctx.discovery().allNodes()); + + final CacheContinuousQueryHandler hnd = clsr.apply(v2); + + hnd.taskNameHash(taskNameHash); + hnd.skipPrimaryCheck(skipPrimaryCheck); + hnd.notifyExisting(notifyExisting); + hnd.internal(internal); + hnd.keepBinary(keepBinary); + hnd.localCache(cctx.isLocal()); IgnitePredicate pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.alwaysTrue(); @@ -654,7 +704,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cctx.kernalContext().cache().jcache(cctx.name()), cctx, entry); - if (rmtFilter != null && !rmtFilter.evaluate(next)) + if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next)) next = null; } } @@ -667,6 +717,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param nodes Nodes. + * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE}, + * otherwise {@code false}. + */ + private boolean useV2Protocol(Collection nodes) { + for (ClusterNode node : nodes) { + if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0) + return false; + } + + return true; + } + + /** * @param lsnrId Listener ID. * @param lsnr Listener. * @param internal Internal flag. @@ -767,36 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (types == 0) throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); - CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener( + final byte types0 = types; + + final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener( locLsnrImpl, log); - CacheEntryEventFilter fltr = null; - - if (cfg.getCacheEntryEventFilterFactory() != null) { - fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create(); - - if (!(fltr instanceof Serializable)) - throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " - + fltr); - } - - CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types); - routineId = executeQuery0( locLsnr, - rmtFilter, + new IgniteClosure() { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { + CacheContinuousQueryHandler hnd; + Factory rmtFilterFactory = cfg.getCacheEntryEventFilterFactory(); + + v2 = rmtFilterFactory != null && v2; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + false, + types0); + else { + JCacheQueryRemoteFilter jCacheFilter; + + CacheEntryEventFilter filter = null; + + if (rmtFilterFactory != null) { + filter = rmtFilterFactory.create(); + + if (!(filter instanceof Serializable)) + throw new IgniteException("Topology has nodes of the old versions. " + + "In this case EntryEventFilter must implement java.io.Serializable " + + "interface. Filter: " + filter); + } + + jCacheFilter = new JCacheQueryRemoteFilter(filter, types0); + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + jCacheFilter, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + false); + } + + return hnd; + } + }, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, false, false, - cfg.isOldValueRequired(), - cfg.isSynchronous(), false, - false, - keepBinary, - false); + keepBinary + ); } /** @@ -814,6 +912,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * */ private static class JCacheQueryLocalListener implements CacheEntryUpdatedListener { /** */ @@ -896,8 +995,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * For handler version 2.0 this filter should not be serialized. */ - private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable { + protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -922,7 +1022,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param impl Filter. * @param types Types. */ - JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) { + JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) { assert types != 0; this.impl = impl; http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1ec69c2..1776748 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; @@ -110,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Threads started by this processor. */ private final Map bufCheckThreads = new ConcurrentHashMap8<>(); + /** */ + public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9"); + /** */ private final ConcurrentMap syncMsgFuts = new ConcurrentHashMap8<>(); @@ -615,7 +619,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { startFuts.put(routineId, fut); try { - if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) + if (locIncluded + && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) hnd.onListenerRegistered(routineId, ctx); ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index e6bfd87..35fbbd5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListener; import javax.cache.event.CacheEntryListenerException; @@ -99,6 +100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** */ private boolean useObjects; + /** */ + private static AtomicBoolean serialized = new AtomicBoolean(false); + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { @@ -138,6 +142,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb assertEquals(0, syncMsgFuts.size()); } + + serialized.set(false); } /** @@ -178,11 +184,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb return new CreateUpdateRemoveExpireListener(); } }, - new Factory>() { - @Override public CacheEntryEventSerializableFilter create() { - return new ExceptionFilter(); - } - }, + new ExceptionFilterFactory(), false, false ); @@ -443,18 +445,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(lsnr), - null, + new SerializableFactory(), true, false )); try { startGrid(gridCount()); + + jcache(0).put(1, 1); } finally { stopGrid(gridCount()); } + jcache(0).put(2, 2); + + assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get()); assertFalse(serialized.get()); } @@ -1130,9 +1137,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilterFactory implements Factory> { + private static class TestFilterFactory implements Factory> { /** {@inheritDoc} */ - @Override public CacheEntryEventSerializableFilter create() { + @Override public CacheEntryEventFilter create() { return new TestFilter(); } } @@ -1184,7 +1191,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilter implements CacheEntryEventSerializableFilter { + private static class TestFilter implements CacheEntryEventFilter, Externalizable { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent evt) { assert evt != null; @@ -1201,6 +1208,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb return key % 2 == 0; } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Filter must not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Filter must not be unmarshaled."); + } } /** @@ -1355,6 +1372,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } /** + * + */ + public static class SerializableFactory implements Factory { + /** {@inheritDoc} */ + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); + } + } + + /** + * + */ + public static class NonSerializableFilter implements CacheEntryEventFilter, Externalizable { + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + serialized.set(true); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + serialized.set(true); + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { + return true; + } + } + + /** */ public static class NonSerializableListener implements CacheEntryCreatedListener, Externalizable { /** */ @@ -1467,4 +1514,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb return S.toString(ListenerTestValue.class, this); } } + + /** + * + */ + static class ExceptionFilterFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheEntryEventSerializableFilter create() { + return new ExceptionFilter(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java index 41725e7..cad57f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java @@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst @Override protected NearCacheConfiguration nearConfiguration() { return null; } - - /** {@inheritDoc} */ - @Override public void testEvents(){ - fail("https://issues.apache.org/jira/browse/IGNITE-1600"); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index ea2f27b..c6cd5af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -403,18 +404,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { } } }, - new Factory>() { - /** {@inheritDoc} */ - @Override public CacheEntryEventSerializableFilter create() { - try { - - return cls2.newInstance(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - }, + new ClassFilterFactory(cls2), true, true ); @@ -946,4 +936,29 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { return true; } } + + /** + * + */ + private static class ClassFilterFactory implements Factory> { + /** */ + private Class cls; + + /** + * @param cls Class. + */ + public ClassFilterFactory(Class cls) { + this.cls = cls; + } + + /** {@inheritDoc} */ + @Override public CacheEntryEventSerializableFilter create() { + try { + return cls.newInstance(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java new file mode 100644 index 0000000..6143fa9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest { + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + public static final int ITERATION_CNT = 40; + + /** + * @throws Exception If failed. + */ + public void testInternalQuery() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + final IgniteCache cache = grid(0).getOrCreateCache(ccfg); + + UUID uuid = null; + + try { + for (int i = 0; i < 10; i++) + cache.put(i, i); + + final CountDownLatch latch = new CountDownLatch(5); + + CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + for (Object evt : iterable) { + latch.countDown(); + + log.info("Received event: " + evt); + } + } + }; + + uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); + + for (int i = 10; i < 20; i++) + cache.put(i, i); + + assertTrue(latch.await(3, SECONDS)); + } + finally { + if (uuid != null) + grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .cancelInternalQuery(uuid); + + cache.destroy(); + } + } + + /** {@inheritDoc} */ + @Override protected void testContinuousQuery(CacheConfiguration ccfg, ContinuousDeploy deploy) + throws Exception { + ignite(0).createCache(ccfg); + + try { + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + List>> evtsQueues = new ArrayList<>(); + + Collection> curs = new ArrayList<>(); + + Collection> lsnrCfgs = new ArrayList<>(); + + if (deploy == CLIENT) + evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); + else if (deploy == SERVER) + evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, + rnd.nextBoolean())); + else { + boolean isSync = rnd.nextBoolean(); + + for (int i = 0; i < NODES - 1; i++) + evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); + } + + ConcurrentMap expData = new ConcurrentHashMap<>(); + + Map partCntr = new ConcurrentHashMap<>(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 10 == 0) + log.info("Iteration: " + i); + + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); + } + } + finally { + for (QueryCursor cur : curs) + cur.close(); + + for (T2 e : lsnrCfgs) + grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cacheName Cache name. + * @param nodeIdx Node index. + * @param curs Cursors. + * @param lsnrCfgs Listener configurations. + * @return Event queue + */ + private BlockingQueue> registerListener(String cacheName, + int nodeIdx, + Collection> curs, + Collection> lsnrCfgs, + boolean sync) { + final BlockingQueue> evtsQueue = new ArrayBlockingQueue<>(50_000); + + if (ThreadLocalRandom.current().nextBoolean()) { + MutableCacheEntryListenerConfiguration lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { + @Override protected void onEvents(Iterable> evts) { + for (CacheEntryEvent evt : evts) + evtsQueue.add(evt); + } + }), + new FilterFactory(), + true, + sync + ); + + grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); + + lsnrCfgs.add(new T2(nodeIdx, lsnrCfg)); + } + else { + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) throws CacheEntryListenerException { + for (CacheEntryEvent evt : evts) + evtsQueue.add(evt); + } + }); + + qry.setRemoteFilterFactory(new FilterFactory()); + + QueryCursor cur = grid(nodeIdx).cache(cacheName).query(qry); + + curs.add(cur); + } + + return evtsQueue; + } + + /** + * @param rnd Random generator. + * @param evtsQueues Events queue. + * @param expData Expected cache data. + * @param partCntr Partition counter. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + List>> evtsQueues, + ConcurrentMap expData, + Map partCntr, + IgniteCache cache) + throws Exception { + Object key = new QueryTestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + Ignite ignite = cache.unwrap(Ignite.class); + + Transaction tx = null; + + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); + + try { + // log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + } + else { + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + + break; + } + + default: + fail("Op:" + op); + } + } finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param cache Cache. + * @param key Key + * @param cntrs Partition counters. + */ + private void updatePartitionCounter(IgniteCache cache, Object key, Map cntrs) { + Affinity aff = cache.unwrap(Ignite.class).affinity(cache.getName()); + + int part = aff.partition(key); + + Long partCntr = cntrs.get(part); + + if (partCntr == null) + partCntr = 0L; + + cntrs.put(part, ++partCntr); + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new QueryTestValue(rnd.nextInt(VALS)); + } + + /** + * @param evtsQueues Event queue. + * @param partCntrs Partition counters. + * @param aff Affinity function. + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @throws Exception If failed. + */ + private void waitAndCheckEvent(List>> evtsQueues, + Map partCntrs, + Affinity aff, + Object key, + Object val, + Object oldVal) + throws Exception { + if ((val == null && oldVal == null + || (val != null && !isAccepted((QueryTestValue)val)))) { + checkNoEvent(evtsQueues); + + return; + } + + for (BlockingQueue> evtsQueue : evtsQueues) { + CacheEntryEvent evt = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } + } + + /** + * @param evtsQueues Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(List>> evtsQueues) throws Exception { + for (BlockingQueue> evtsQueue : evtsQueues) { + CacheEntryEvent evt = evtsQueue.poll(50, MILLISECONDS); + + assertNull(evt); + } + } + + /** + * + */ + protected static class NonSerializableFilter + implements CacheEntryEventSerializableFilter, Externalizable { + /** */ + public NonSerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent event) + throws CacheEntryListenerException { + return isAccepted(event.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; + } + } + + /** + * + */ + protected static class SerializableFilter implements CacheEntryEventSerializableFilter{ + /** */ + public SerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent event) + throws CacheEntryListenerException { + return isAccepted(event.getValue()); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(Integer val) { + return val == null || val % 2 == 0; + } + } + + /** + * + */ + protected static class FilterFactory implements Factory { + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); + } + } + + /** + * + */ + public abstract class LocalNonSerialiseListener implements + CacheEntryUpdatedListener, + CacheEntryCreatedListener, + CacheEntryExpiredListener, + CacheEntryRemovedListener, + Externalizable { + /** */ + public LocalNonSerialiseListener() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** + * @param evts Events. + */ + protected abstract void onEvents(Iterable> evts); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index a42f056..f104f21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); if (hnd.isQuery() && hnd.cacheName() == null) { - backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue"); + backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue"); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java new file mode 100644 index 0000000..97f9e0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + cfg.setPeerClassLoadingEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClient() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomic() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicated() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedClient() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTx() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + /** + * @throws Exception If failed. + */ + public void testTxClient() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedClient() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @param isClient Client. + * @throws Exception If failed. + */ + protected void testContinuousQuery(CacheConfiguration ccfg, boolean isClient) + throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + QueryCursor cur = null; + + final Class> evtFilterFactory = + (Class>)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory"); + + final CountDownLatch latch = new CountDownLatch(10); + + ContinuousQuery qry = new ContinuousQuery<>(); + + TestLocalListener localLsnr = new TestLocalListener() { + @Override public void onEvent(Iterable> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent evt : evts) { + latch.countDown(); + + log.info("Received event: " + evt); + } + } + }; + + MutableCacheEntryListenerConfiguration lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + new FactoryBuilder.SingletonFactory<>(localLsnr), + (Factory>) + (Object)evtFilterFactory.newInstance(), + true, + true + ); + + qry.setLocalListener(localLsnr); + + qry.setRemoteFilterFactory( + (Factory>)(Object)evtFilterFactory.newInstance()); + + IgniteCache cache = null; + + try { + if (isClient) + cache = grid(NODES - 1).cache(ccfg.getName()); + else + cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()); + + cur = cache.query(qry); + + cache.registerCacheEntryListener(lsnrCfg); + + for (int i = 0; i < 10; i++) + cache.put(i, i); + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } + finally { + if (cur != null) + cur.close(); + + if (cache != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + + /** + * + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + private static abstract class TestLocalListener implements CacheEntryUpdatedListener, + CacheEntryCreatedListener { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> evts) + throws CacheEntryListenerException { + onEvent(evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) + throws CacheEntryListenerException { + onEvent(evts); + } + + /** + * @param evts Events. + */ + protected abstract void onEvent(Iterable> evts); + } +}