ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: Merge branch 'master' into ignite-1186
Date Tue, 01 Mar 2016 13:54:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1186 950eea125 -> c26aa48a3


Merge branch 'master' into ignite-1186


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c26aa48a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c26aa48a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c26aa48a

Branch: refs/heads/ignite-1186
Commit: c26aa48a3da70a59cf684381fabb6c19965bbd96
Parents: 950eea1
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Mar 1 16:54:15 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Mar 1 16:54:15 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   6 -
 .../internal/GridMessageListenHandler.java      |   6 -
 .../continuous/CacheContinuousQueryHandler.java |  57 ++-
 .../CacheContinuousQueryHandlerV2.java          |  52 +--
 .../continuous/CacheContinuousQueryManager.java | 458 +++++++------------
 .../continuous/GridContinuousHandler.java       |   5 -
 .../CacheContinuousQueryFactoryFilterTest.java  |  78 +++-
 ...acheContinuousQueryRandomOperationsTest.java |   2 +-
 8 files changed, 272 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 924a8ea..e2b1184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -26,7 +26,6 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
-import javax.cache.event.CacheEntryEventFilter;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
@@ -142,11 +141,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheEntryEventFilter getEventFilter() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final
GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index e157c98..402365c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -24,7 +24,6 @@ import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
-import javax.cache.event.CacheEntryEventFilter;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
@@ -131,11 +130,6 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
-    @Override public CacheEntryEventFilter getEventFilter() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext
ctx)
         throws IgniteCheckedException {
         ctx.io().addUserMessageListener(topic, pred);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 393f7fb..10fbd89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param topic Topic for ordered messages.
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
-     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
-     * @param taskHash Task name hash code.
-     * @param locCache {@code True} if local cache.
-     * @param keepBinary Keep binary flag.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
         CacheEntryEventSerializableFilter<K, V> rmtFilter,
-        boolean internal,
-        boolean notifyExisting,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        int taskHash,
-        boolean skipPrimaryCheck,
-        boolean locCache,
-        boolean keepBinary,
         boolean ignoreClsNotFound) {
         assert topic != null;
         assert locLsnr != null;
@@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         this.topic = topic;
         this.locLsnr = locLsnr;
         this.rmtFilter = rmtFilter;
-        this.internal = internal;
-        this.notifyExisting = notifyExisting;
         this.oldValRequired = oldValRequired;
         this.sync = sync;
         this.ignoreExpired = ignoreExpired;
-        this.taskHash = taskHash;
-        this.skipPrimaryCheck = skipPrimaryCheck;
-        this.locCache = locCache;
-        this.keepBinary = keepBinary;
         this.ignoreClsNotFound = ignoreClsNotFound;
 
         cacheId = CU.cacheId(cacheName);
     }
 
+    /**
+     * @param internal Internal query.
+     */
+    public void internal(boolean internal) {
+        this.internal = internal;
+    }
+
+    /**
+     * @param notifyExisting Notify existing.
+     */
+    public void notifyExisting(boolean notifyExisting) {
+        this.notifyExisting = notifyExisting;
+    }
+
+    /**
+     * @param locCache Local cache.
+     */
+    public void localCache(boolean locCache) {
+        this.locCache = locCache;
+    }
+
+    /**
+     * @param taskHash Task hash.
+     */
+    public void taskNameHash(int taskHash) {
+        this.taskHash = taskHash;
+    }
+
+    /**
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+     */
+    public void skipPrimaryCheck(boolean skipPrimaryCheck) {
+        this.skipPrimaryCheck = skipPrimaryCheck;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;
@@ -520,7 +537,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         return mgr.registerListener(routineId, lsnr, internal);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * @return Cache entry event filter.
+     */
     public CacheEntryEventFilter getEventFilter() {
         return rmtFilter;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index a660dce..7aef4dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -66,15 +66,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
      * @param topic Topic for ordered messages.
      * @param locLsnr Local listener.
      * @param rmtFilterFactory Remote filter factory.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
-     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
-     * @param taskHash Task name hash code.
-     * @param locCache {@code True} if local cache.
-     * @param keepBinary Keep binary flag.
      * @param types Event types.
      */
     public CacheContinuousQueryHandlerV2(
@@ -82,30 +76,18 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
         Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
-        boolean internal,
-        boolean notifyExisting,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        int taskHash,
-        boolean skipPrimaryCheck,
-        boolean locCache,
-        boolean keepBinary,
         boolean ignoreClsNotFound,
         @Nullable Byte types) {
         super(cacheName,
             topic,
             locLsnr,
             null,
-            internal,
-            notifyExisting,
             oldValRequired,
             sync,
             ignoreExpired,
-            taskHash,
-            skipPrimaryCheck,
-            locCache,
-            keepBinary,
             ignoreClsNotFound);
 
         assert rmtFilterFactory != null;
@@ -126,10 +108,10 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
 
             Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
 
-            if (types != 0)
-                factory = new JCacheRemoteQueryFactory(rmtFilterFactory, types);
-
             filter = factory.create();
+
+            if (types != 0)
+                filter = new JCacheQueryRemoteFilter(filter, types);
         }
 
         return filter;
@@ -191,32 +173,4 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
 
         types = in.readByte();
     }
-
-    /**
-     *
-     */
-    private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter>
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Factory. */
-        protected Factory<? extends CacheEntryEventFilter> impl;
-
-        /** */
-        private byte types;
-
-        /**
-         * @param impl Factory.
-         * @param types Types.
-         */
-        public JCacheRemoteQueryFactory(@Nullable Factory<? extends CacheEntryEventFilter>
impl, byte types) {
-            this.impl = impl;
-            this.types = types;
-        }
-
-        /** {@inheritDoc} */
-        @Override public JCacheQueryRemoteFilter create() {
-            return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 06f14e1..50253fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
@@ -415,45 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
-        @Nullable CacheEntryEventSerializableFilter rmtFilter,
-        @Nullable Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
+    public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+        @Nullable final CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean loc,
-        boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+
         if (rmtFilterFactory != null)
-            return executeQueryWithFilterFactory(
-                locLsnr,
-                rmtFilterFactory,
-                bufSize,
-                timeInterval,
-                autoUnsubscribe,
-                false,
-                false,
-                true,
-                false,
-                true,
-                loc,
-                keepBinary,
-                false);
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                    CacheContinuousQueryHandler hnd;
+
+                    if (v2)
+                        hnd = new CacheContinuousQueryHandlerV2(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            rmtFilterFactory,
+                            true,
+                            false,
+                            true,
+                            false,
+                            null);
+                    else {
+                        CacheEntryEventFilter fltr = rmtFilterFactory.create();
+
+                        if (!(fltr instanceof CacheEntryEventSerializableFilter))
+                            throw new IgniteException("Topology has nodes of the old versions.
In this case " +
+                                "EntryEventFilter should implement " +
+                                "org.apache.ignite.cache.CacheEntryEventSerializableFilter
interface. Filter: " + fltr);
+
+                        hnd = new CacheContinuousQueryHandler(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            (CacheEntryEventSerializableFilter)fltr,
+                            true,
+                            false,
+                            true,
+                            false);
+                    }
+
+                    return hnd;
+                }
+            };
         else
-            return executeQueryWithFilter(
-                locLsnr,
-                rmtFilter,
-                bufSize,
-                timeInterval,
-                autoUnsubscribe,
-                false,
-                false,
-                true,
-                false,
-                true,
-                loc,
-                keepBinary,
-                false);
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        false);
+                }
+            };
+
+        return executeQuery0(
+            locLsnr,
+            clsr,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            false,
+            false,
+            loc,
+            keepBinary);
     }
 
     /**
@@ -464,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
-        boolean loc,
-        boolean notifyExisting,
-        boolean ignoreClassNotFound)
+    public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr,
+        final CacheEntryEventSerializableFilter rmtFilter,
+        final boolean loc,
+        final boolean notifyExisting,
+        final boolean ignoreClassNotFound)
         throws IgniteCheckedException
     {
-        return executeQueryWithFilter(
+        return executeQuery0(
             locLsnr,
-            rmtFilter,
+            new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        ignoreClassNotFound);
+                }
+            },
             ContinuousQuery.DFLT_PAGE_SIZE,
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
             true,
             notifyExisting,
-            true,
-            false,
-            true,
             loc,
-            false,
-            ignoreClassNotFound);
+            false);
     }
 
     /**
@@ -558,195 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
     /**
      * @param locLsnr Local listener.
-     * @param rmtFilter Remote filter.
-     * @param bufSize Buffer size.
-     * @param timeInterval Time interval.
-     * @param autoUnsubscribe Auto unsubscribe flag.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired event flag.
-     * @param loc Local flag.
-     * @return Continuous routine ID.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private UUID executeQueryWithFilter(CacheEntryUpdatedListener locLsnr,
-        final CacheEntryEventSerializableFilter rmtFilter,
-        int bufSize,
-        long timeInterval,
-        boolean autoUnsubscribe,
-        boolean internal,
-        boolean notifyExisting,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
-        boolean loc,
-        final boolean keepBinary,
-        boolean ignoreClassNotFound) throws IgniteCheckedException
-    {
-        cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
-        int taskNameHash = !internal && cctx.kernalContext().security().enabled()
?
-            cctx.kernalContext().job().currentTaskNameHash() : 0;
-
-        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
-
-        GridContinuousHandler hnd = new CacheContinuousQueryHandler(
-            cctx.name(),
-            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-            locLsnr,
-            rmtFilter,
-            internal,
-            notifyExisting,
-            oldValRequired,
-            sync,
-            ignoreExpired,
-            taskNameHash,
-            skipPrimaryCheck,
-            cctx.isLocal(),
-            keepBinary,
-            ignoreClassNotFound);
-
-        return executeQuery0(locLsnr,
-            bufSize,
-            timeInterval,
-            autoUnsubscribe,
-            notifyExisting,
-            loc,
-            keepBinary,
-            hnd);
-    }
-
-    /**
-     * @param locLsnr Local listener.
-     * @param types JCache event types.
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired event flag.
      * @param loc Local flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr,
-        final Factory<CacheEntryEventFilter> rmtFilterFactory,
-        byte types,
-        int bufSize,
-        long timeInterval,
-        boolean autoUnsubscribe,
-        boolean internal,
-        boolean notifyExisting,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
-        boolean loc,
-        final boolean keepBinary,
-        boolean ignoreClassNotFound) throws IgniteCheckedException
-    {
-        assert types != 0 : types;
-
-        cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
-        int taskNameHash = !internal && cctx.kernalContext().security().enabled()
?
-            cctx.kernalContext().job().currentTaskNameHash() : 0;
-
-        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
-
-        boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes());
-
-        GridContinuousHandler hnd;
-
-        if (v2)
-            hnd = new CacheContinuousQueryHandlerV2(
-                cctx.name(),
-                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                locLsnr,
-                rmtFilterFactory,
-                internal,
-                notifyExisting,
-                oldValRequired,
-                sync,
-                ignoreExpired,
-                taskNameHash,
-                skipPrimaryCheck,
-                cctx.isLocal(),
-                keepBinary,
-                ignoreClassNotFound,
-                types);
-        else {
-            JCacheQueryRemoteFilter jCacheFilter;
-
-            CacheEntryEventFilter filter = null;
-
-            if (rmtFilterFactory != null) {
-                filter = rmtFilterFactory.create();
-
-                if (!(filter instanceof Serializable))
-                    throw new IgniteCheckedException("Topology has nodes of the old versions.
In this case " +
-                        "EntryEventFilter must implement java.io.Serializable interface.
Filter: " + filter);
-            }
-
-            jCacheFilter = new JCacheQueryRemoteFilter(filter, types);
-
-            hnd = new CacheContinuousQueryHandler(
-                cctx.name(),
-                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                locLsnr,
-                jCacheFilter,
-                internal,
-                notifyExisting,
-                oldValRequired,
-                sync,
-                ignoreExpired,
-                taskNameHash,
-                skipPrimaryCheck,
-                cctx.isLocal(),
-                keepBinary,
-                ignoreClassNotFound);
-        }
-
-        return executeQuery0(locLsnr,
-            bufSize,
-            timeInterval,
-            autoUnsubscribe,
-            notifyExisting,
-            loc,
-            keepBinary,
-            hnd);
-    }
-
-    /**
-     * @param locLsnr Local listener.
-     * @param bufSize Buffer size.
-     * @param timeInterval Time interval.
-     * @param autoUnsubscribe Auto unsubscribe flag.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired event flag.
-     * @param loc Local flag.
-     * @return Continuous routine ID.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr,
-        final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
+    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean internal,
         boolean notifyExisting,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
         boolean loc,
-        final boolean keepBinary,
-        boolean ignoreClassNotFound) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -755,87 +628,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
 
-        boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes());
+        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
 
-        GridContinuousHandler hnd;
+        final CacheContinuousQueryHandler hnd = clsr.apply(v2);
 
-        if (v2)
-            hnd = new CacheContinuousQueryHandlerV2(
-                cctx.name(),
-                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                locLsnr,
-                rmtFilterFactory,
-                internal,
-                notifyExisting,
-                oldValRequired,
-                sync,
-                ignoreExpired,
-                taskNameHash,
-                skipPrimaryCheck,
-                cctx.isLocal(),
-                keepBinary,
-                ignoreClassNotFound,
-                null);
-        else {
-            CacheEntryEventFilter fltr = null;
+        hnd.taskNameHash(taskNameHash);
+        hnd.skipPrimaryCheck(skipPrimaryCheck);
+        hnd.notifyExisting(notifyExisting);
+        hnd.internal(internal);
+        hnd.keepBinary(keepBinary);
+        hnd.localCache(loc);
 
-            if (rmtFilterFactory != null) {
-                fltr = rmtFilterFactory.create();
-
-                if (!(fltr instanceof CacheEntryEventSerializableFilter))
-                    throw new IgniteCheckedException("Topology has nodes of the old versions.
In this case " +
-                        "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter
" +
-                        "interface. Filter: " + fltr);
-            }
-
-            hnd = new CacheContinuousQueryHandler(
-                cctx.name(),
-                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                locLsnr,
-                (CacheEntryEventSerializableFilter)fltr,
-                internal,
-                notifyExisting,
-                oldValRequired,
-                sync,
-                ignoreExpired,
-                taskNameHash,
-                skipPrimaryCheck,
-                cctx.isLocal(),
-                keepBinary,
-                ignoreClassNotFound);
-        }
-
-        return executeQuery0(locLsnr,
-            bufSize,
-            timeInterval,
-            autoUnsubscribe,
-            notifyExisting,
-            loc,
-            keepBinary,
-            hnd);
-    }
-
-    /**
-     * @param locLsnr Local listener.
-     * @param bufSize Buffer size.
-     * @param timeInterval Time interval.
-     * @param autoUnsubscribe Auto unsubscribe flag.
-     * @param notifyExisting Notify existing flag.
-     * @param loc Local flag.
-     * @param keepBinary Keep binary.
-     * @param hnd Handler.
-     * @return Continuous routine ID.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
-        int bufSize,
-        long timeInterval,
-        boolean autoUnsubscribe,
-        boolean notifyExisting,
-        boolean loc,
-        final boolean keepBinary,
-        final GridContinuousHandler hnd)
-        throws IgniteCheckedException {
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() ==
CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
 
@@ -1028,25 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             if (types == 0)
                 throw new IgniteCheckedException("Listener must implement one of CacheEntryListener
sub-interfaces.");
 
-            CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
+            final byte types0 = types;
+
+            final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
                 locLsnrImpl,
                 log);
 
-            routineId = executeJCacheQueryFactory(
+            routineId = executeQuery0(
                 locLsnr,
-                cfg.getCacheEntryEventFilterFactory(),
-                types,
+                new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                    @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                        CacheContinuousQueryHandler hnd;
+                        Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+
+                        v2 = rmtFilterFactory != null && v2;
+
+                        if (v2)
+                            hnd = new CacheContinuousQueryHandlerV2(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                rmtFilterFactory,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false,
+                                types0);
+                        else {
+                            JCacheQueryRemoteFilter jCacheFilter;
+
+                            CacheEntryEventFilter filter = null;
+
+                            if (rmtFilterFactory != null) {
+                                filter = rmtFilterFactory.create();
+
+                                if (!(filter instanceof Serializable))
+                                    throw new IgniteException("Topology has nodes of the
old versions. " +
+                                        "In this case EntryEventFilter must implement java.io.Serializable
" +
+                                        "interface. Filter: " + filter);
+                            }
+
+                            jCacheFilter = new JCacheQueryRemoteFilter(filter, types0);
+
+                            hnd = new CacheContinuousQueryHandler(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                jCacheFilter,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false);
+                        }
+
+                        return hnd;
+                    }
+                },
                 ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
                 false,
                 false,
-                cfg.isOldValueRequired(),
-                cfg.isSynchronous(),
-                false,
                 false,
-                keepBinary,
-                false);
+                keepBinary
+            );
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 232e1ff..48227fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -159,9 +159,4 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
      * @param topVer Topology version.
      */
     public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
-
-    /**
-     * @return Cache entry filter.
-     */
-    public CacheEntryEventFilter getEventFilter();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
index baa5a62..6143fa9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -26,10 +26,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
@@ -56,7 +58,10 @@ import org.apache.ignite.transactions.TransactionIsolation;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
@@ -80,6 +85,53 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR
     /** */
     public static final int ITERATION_CNT = 40;
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInternalQuery() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        UUID uuid = null;
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(5);
+
+            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException
{
+                    for (Object evt : iterable) {
+                        latch.countDown();
+
+                        log.info("Received event: " + evt);
+                    }
+                }
+            };
+
+            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+            for (int i = 10; i < 20; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, SECONDS));
+        }
+        finally {
+            if (uuid != null)
+                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                    .cancelInternalQuery(uuid);
+
+            cache.destroy();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void testContinuousQuery(CacheConfiguration<Object, Object>
ccfg, ContinuousDeploy deploy)
         throws Exception {
@@ -542,7 +594,8 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR
      *
      */
     protected static class NonSerializableFilter
-        implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>,
Externalizable {
+        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
         /** */
         public NonSerializableFilter() {
             // No-op.
@@ -575,6 +628,29 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR
     /**
      *
      */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer,
Integer>{
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
Integer> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Integer val) {
+            return val == null || val % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
     protected static class FilterFactory implements Factory<NonSerializableFilter>
{
         @Override public NonSerializableFilter create() {
             return new NonSerializableFilter();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index c18cf35..cdf4ffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -1175,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param store If {@code true} configures dummy cache store.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
         CacheAtomicityMode atomicityMode,


Mime
View raw message