ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/6] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".
Date Fri, 04 Mar 2016 09:21:22 GMT
Fixed  IGNITE-1186 "Filter is sent instead of factory when continuous query is created".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baa13122
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baa13122
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baa13122

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: baa131220bf503da0908e4ecfee92966317e209c
Parents: c13339f
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Mar 3 16:21:53 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Mar 3 16:21:53 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     |  35 +
 .../processors/cache/IgniteCacheProxy.java      |   4 +
 .../continuous/CacheContinuousQueryHandler.java |  86 ++-
 .../CacheContinuousQueryHandlerV2.java          | 176 +++++
 .../continuous/CacheContinuousQueryManager.java | 238 +++++--
 .../continuous/GridContinuousProcessor.java     |   7 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  75 +-
 .../cache/IgniteCacheEntryListenerTxTest.java   |   5 -
 .../GridCacheReplicatedPreloadSelfTest.java     |  39 +-
 .../CacheContinuousQueryFactoryFilterTest.java  | 714 +++++++++++++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../CacheContinuousQueryOperationP2PTest.java   | 326 +++++++++
 ...acheContinuousQueryRandomOperationsTest.java |  63 +-
 .../p2p/CacheDeploymentEntryEventFilter.java    |  33 +
 .../CacheDeploymentEntryEventFilterFactory.java |  31 +
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +
 16 files changed, 1706 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index df1bad3..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.cache.query;
 
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -119,6 +121,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** Remote filter. */
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
     /** Time interval. */
     private long timeInterval = DFLT_TIME_INTERVAL;
 
@@ -196,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @param rmtFilter Key-value filter.
      * @return {@code this} for chaining.
+     *
+     * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
      */
+    @Deprecated
     public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
         this.rmtFilter = rmtFilter;
 
@@ -213,6 +221,33 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     }
 
     /**
+     * Sets optional key-value filter factory. This factory produces filter is called before entry is
+     * sent to the master node.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+     * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     *
+     * @param rmtFilterFactory Key-value filter factory.
+     * @return {@code this} for chaining.
+     */
+    public ContinuousQuery<K, V> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
+     */
+    public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
+        return rmtFilterFactory;
+    }
+
+    /**
      * Sets time interval.
      * <p>
      * When a cache update happens, entry is first put into a buffer. Entries from buffer will

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5ed8753..6e8bcbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -565,10 +565,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (qry.getLocalListener() == null)
             throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
 
+        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+            throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
         try {
             final UUID routineId = ctx.continuousQueries().executeQuery(
                 qry.getLocalListener(),
                 qry.getRemoteFilter(),
+                qry.getRemoteFilterFactory(),
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1938edb..10fbd89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
@@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param topic Topic for ordered messages.
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
-     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
-     * @param taskHash Task name hash code.
-     * @param locCache {@code True} if local cache.
-     * @param keepBinary Keep binary flag.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
         CacheEntryEventSerializableFilter<K, V> rmtFilter,
-        boolean internal,
-        boolean notifyExisting,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        int taskHash,
-        boolean skipPrimaryCheck,
-        boolean locCache,
-        boolean keepBinary,
         boolean ignoreClsNotFound) {
         assert topic != null;
         assert locLsnr != null;
@@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         this.topic = topic;
         this.locLsnr = locLsnr;
         this.rmtFilter = rmtFilter;
-        this.internal = internal;
-        this.notifyExisting = notifyExisting;
         this.oldValRequired = oldValRequired;
         this.sync = sync;
         this.ignoreExpired = ignoreExpired;
-        this.taskHash = taskHash;
-        this.skipPrimaryCheck = skipPrimaryCheck;
-        this.locCache = locCache;
-        this.keepBinary = keepBinary;
         this.ignoreClsNotFound = ignoreClsNotFound;
 
         cacheId = CU.cacheId(cacheName);
     }
 
+    /**
+     * @param internal Internal query.
+     */
+    public void internal(boolean internal) {
+        this.internal = internal;
+    }
+
+    /**
+     * @param notifyExisting Notify existing.
+     */
+    public void notifyExisting(boolean notifyExisting) {
+        this.notifyExisting = notifyExisting;
+    }
+
+    /**
+     * @param locCache Local cache.
+     */
+    public void localCache(boolean locCache) {
+        this.locCache = locCache;
+    }
+
+    /**
+     * @param taskHash Task hash.
+     */
+    public void taskNameHash(int taskHash) {
+        this.taskHash = taskHash;
+    }
+
+    /**
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+     */
+    public void skipPrimaryCheck(boolean skipPrimaryCheck) {
+        this.skipPrimaryCheck = skipPrimaryCheck;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;
@@ -262,8 +279,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (locLsnr != null)
             ctx.resource().injectGeneric(locLsnr);
 
-        if (rmtFilter != null)
-            ctx.resource().injectGeneric(rmtFilter);
+        final CacheEntryEventFilter filter = getEventFilter();
+
+        if (filter != null)
+            ctx.resource().injectGeneric(filter);
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -303,7 +322,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName()
@@ -332,9 +352,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                 boolean notify = !evt.entry().isFiltered();
 
-                if (notify && rmtFilter != null) {
+                if (notify && filter != null) {
                     try {
-                        notify = rmtFilter.evaluate(evt);
+                        notify = filter.evaluate(evt);
                     }
                     catch (Exception e) {
                         U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +442,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName(),
@@ -435,8 +456,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (rmtFilter instanceof PlatformContinuousQueryFilter)
-                    ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+                if (filter instanceof PlatformContinuousQueryFilter)
+                    ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +538,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @return Cache entry event filter.
+     */
+    public CacheEntryEventFilter getEventFilter() {
+        return rmtFilter;
+    }
+
+    /**
      * @param cctx Context.
      * @param nodeId ID of the node that started routine.
      * @param entry Entry.
@@ -1189,7 +1217,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * Deployable object.
      */
-    private static class DeployableObject implements Externalizable {
+    protected static class DeployableObject implements Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1214,7 +1242,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param ctx Kernal context.
          * @throws IgniteCheckedException In case of error.
          */
-        private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+        protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
             assert obj != null;
             assert ctx != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
new file mode 100644
index 0000000..7aef4dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V2 version. Contains {@link Factory} for remote listener.
+ */
+public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+    /** Deployable object for filter factory. */
+    private DeployableObject rmtFilterFactoryDep;
+
+    /** Event types for JCache API. */
+    private byte types;
+
+    /** */
+    protected transient CacheEntryEventFilter filter;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryHandlerV2() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheName Cache name.
+     * @param topic Topic for ordered messages.
+     * @param locLsnr Local listener.
+     * @param rmtFilterFactory Remote filter factory.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired events flag.
+     * @param types Event types.
+     */
+    public CacheContinuousQueryHandlerV2(
+        String cacheName,
+        Object topic,
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound,
+        @Nullable Byte types) {
+        super(cacheName,
+            topic,
+            locLsnr,
+            null,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound);
+
+        assert rmtFilterFactory != null;
+
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        if (types != null) {
+            assert types != 0;
+
+            this.types = types;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter getEventFilter() {
+        if (filter == null) {
+            assert rmtFilterFactory != null;
+
+            Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
+
+            filter = factory.create();
+
+            if (types != 0)
+                filter = new JCacheQueryRemoteFilter(filter, types);
+        }
+
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pMarshal(ctx);
+
+        if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
+            rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pUnmarshal(nodeId, ctx);
+
+        if (rmtFilterFactoryDep != null)
+            rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridContinuousHandler clone() {
+        return super.clone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryHandlerV2.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        boolean b = rmtFilterFactoryDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(rmtFilterFactoryDep);
+        else
+            out.writeObject(rmtFilterFactory);
+
+        out.writeByte(types);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            rmtFilterFactoryDep = (DeployableObject)in.readObject();
+        else
+            rmtFilterFactory = (Factory)in.readObject();
+
+        types = in.readByte();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 409c1da..353043f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,15 +23,16 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
@@ -54,10 +55,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
 
 /**
  * Continuous queries manager.
@@ -413,28 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
+    public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+        @Nullable final CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean loc,
-        boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+
+        if (rmtFilterFactory != null)
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                    CacheContinuousQueryHandler hnd;
+
+                    if (v2)
+                        hnd = new CacheContinuousQueryHandlerV2(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            rmtFilterFactory,
+                            true,
+                            false,
+                            true,
+                            false,
+                            null);
+                    else {
+                        CacheEntryEventFilter fltr = rmtFilterFactory.create();
+
+                        if (!(fltr instanceof CacheEntryEventSerializableFilter))
+                            throw new IgniteException("Topology has nodes of the old versions. In this case " +
+                                "EntryEventFilter should implement " +
+                                "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
+
+                        hnd = new CacheContinuousQueryHandler(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            (CacheEntryEventSerializableFilter)fltr,
+                            true,
+                            false,
+                            true,
+                            false);
+                    }
+
+                    return hnd;
+                }
+            };
+        else
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        false);
+                }
+            };
+
         return executeQuery0(
             locLsnr,
-            rmtFilter,
+            clsr,
             bufSize,
             timeInterval,
             autoUnsubscribe,
             false,
             false,
-            true,
-            false,
-            true,
             loc,
-            keepBinary,
-            false);
+            keepBinary);
     }
 
     /**
@@ -445,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
-        boolean loc,
-        boolean notifyExisting,
-        boolean ignoreClassNotFound)
+    public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr,
+        final CacheEntryEventSerializableFilter rmtFilter,
+        final boolean loc,
+        final boolean notifyExisting,
+        final boolean ignoreClassNotFound)
         throws IgniteCheckedException
     {
         return executeQuery0(
             locLsnr,
-            rmtFilter,
+            new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        ignoreClassNotFound);
+                }
+            },
             ContinuousQuery.DFLT_PAGE_SIZE,
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
             true,
             notifyExisting,
-            true,
-            false,
-            true,
             loc,
-            false,
-            ignoreClassNotFound);
+            false);
     }
 
     /**
@@ -539,32 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
     /**
      * @param locLsnr Local listener.
-     * @param rmtFilter Remote filter.
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired event flag.
      * @param loc Local flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
     private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
-        final CacheEntryEventSerializableFilter rmtFilter,
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean internal,
         boolean notifyExisting,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
         boolean loc,
-        final boolean keepBinary,
-        boolean ignoreClassNotFound) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -573,21 +628,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
 
-        GridContinuousHandler hnd = new CacheContinuousQueryHandler(
-            cctx.name(),
-            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-            locLsnr,
-            rmtFilter,
-            internal,
-            notifyExisting,
-            oldValRequired,
-            sync,
-            ignoreExpired,
-            taskNameHash,
-            skipPrimaryCheck,
-            cctx.isLocal(),
-            keepBinary,
-            ignoreClassNotFound);
+        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+        final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+
+        hnd.taskNameHash(taskNameHash);
+        hnd.skipPrimaryCheck(skipPrimaryCheck);
+        hnd.notifyExisting(notifyExisting);
+        hnd.internal(internal);
+        hnd.keepBinary(keepBinary);
+        hnd.localCache(cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -654,7 +704,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     cctx.kernalContext().cache().jcache(cctx.name()),
                                     cctx, entry);
 
-                                if (rmtFilter != null && !rmtFilter.evaluate(next))
+                                if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
                                     next = null;
                             }
                         }
@@ -667,6 +717,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param nodes Nodes.
+     * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
+     *     otherwise {@code false}.
+     */
+    private boolean useV2Protocol(Collection<ClusterNode> nodes) {
+        for (ClusterNode node : nodes) {
+            if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param lsnrId Listener ID.
      * @param lsnr Listener.
      * @param internal Internal flag.
@@ -767,36 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             if (types == 0)
                 throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces.");
 
-            CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
+            final byte types0 = types;
+
+            final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
                 locLsnrImpl,
                 log);
 
-            CacheEntryEventFilter fltr = null;
-
-            if (cfg.getCacheEntryEventFilterFactory() != null) {
-                fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
-                if (!(fltr instanceof Serializable))
-                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
-                        + fltr);
-            }
-
-            CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
             routineId = executeQuery0(
                 locLsnr,
-                rmtFilter,
+                new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                    @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                        CacheContinuousQueryHandler hnd;
+                        Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+
+                        v2 = rmtFilterFactory != null && v2;
+
+                        if (v2)
+                            hnd = new CacheContinuousQueryHandlerV2(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                rmtFilterFactory,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false,
+                                types0);
+                        else {
+                            JCacheQueryRemoteFilter jCacheFilter;
+
+                            CacheEntryEventFilter filter = null;
+
+                            if (rmtFilterFactory != null) {
+                                filter = rmtFilterFactory.create();
+
+                                if (!(filter instanceof Serializable))
+                                    throw new IgniteException("Topology has nodes of the old versions. " +
+                                        "In this case EntryEventFilter must implement java.io.Serializable " +
+                                        "interface. Filter: " + filter);
+                            }
+
+                            jCacheFilter = new JCacheQueryRemoteFilter(filter, types0);
+
+                            hnd = new CacheContinuousQueryHandler(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                jCacheFilter,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false);
+                        }
+
+                        return hnd;
+                    }
+                },
                 ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
                 false,
                 false,
-                cfg.isOldValueRequired(),
-                cfg.isSynchronous(),
                 false,
-                false,
-                keepBinary,
-                false);
+                keepBinary
+            );
         }
 
         /**
@@ -814,6 +912,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
      */
     private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
         /** */
@@ -896,8 +995,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * For handler version 2.0 this filter should not be serialized.
      */
-    private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
+    protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -922,7 +1022,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
          * @param impl Filter.
          * @param types Types.
          */
-        JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) {
+        JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) {
             assert types != 0;
 
             this.impl = impl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1ec69c2..1776748 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -110,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Threads started by this processor. */
     private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>();
 
+    /**  */
+    public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9");
+
     /** */
     private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
 
@@ -615,7 +619,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         startFuts.put(routineId, fut);
 
         try {
-            if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
+            if (locIncluded
+                && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
                 hnd.onListenerRegistered(routineId, ctx);
 
             ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..35fbbd5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListener;
 import javax.cache.event.CacheEntryListenerException;
@@ -99,6 +100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /** */
     private boolean useObjects;
 
+    /** */
+    private static AtomicBoolean serialized = new AtomicBoolean(false);
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +142,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             assertEquals(0, syncMsgFuts.size());
         }
+
+        serialized.set(false);
     }
 
     /**
@@ -178,11 +184,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
-                @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
-                    return new ExceptionFilter();
-                }
-            },
+            new ExceptionFilterFactory(),
             false,
             false
         );
@@ -443,18 +445,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
             FactoryBuilder.factoryOf(lsnr),
-            null,
+            new SerializableFactory(),
             true,
             false
         ));
 
         try {
             startGrid(gridCount());
+
+            jcache(0).put(1, 1);
         }
         finally {
             stopGrid(gridCount());
         }
 
+        jcache(0).put(2, 2);
+
+        assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
         assertFalse(serialized.get());
     }
 
@@ -1130,9 +1137,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+        @Override public CacheEntryEventFilter<Object, Object> create() {
             return new TestFilter();
         }
     }
@@ -1184,7 +1191,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+    private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
@@ -1201,6 +1208,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             return key % 2 == 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Filter must not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Filter must not be unmarshaled.");
+        }
     }
 
     /**
@@ -1355,6 +1372,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     *
+     */
+    public static class SerializableFactory implements Factory<NonSerializableFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+            return true;
+        }
+    }
+
+    /**
      */
     public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
         /** */
@@ -1467,4 +1514,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             return S.toString(ListenerTestValue.class, this);
         }
     }
+
+    /**
+     *
+     */
+    static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+            return new ExceptionFilter();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
     @Override protected NearCacheConfiguration nearConfiguration() {
         return null;
     }
-
-    /** {@inheritDoc} */
-    @Override public void testEvents(){
-        fail("https://issues.apache.org/jira/browse/IGNITE-1600");
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index ea2f27b..c6cd5af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -403,18 +404,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
                         }
                     }
                 },
-                new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
-                    /** {@inheritDoc} */
-                    @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
-                        try {
-
-                            return cls2.newInstance();
-                        }
-                        catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                },
+                new ClassFilterFactory(cls2),
                 true,
                 true
             );
@@ -946,4 +936,29 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             return true;
         }
     }
+
+    /**
+     *
+     */
+    private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, Object>> {
+        /** */
+        private Class<CacheEntryEventSerializableFilter> cls;
+
+        /**
+         * @param cls Class.
+         */
+        public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+            this.cls = cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+            try {
+                return cls.newInstance();
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
new file mode 100644
index 0000000..6143fa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 40;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInternalQuery() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        UUID uuid = null;
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(5);
+
+            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    for (Object evt : iterable) {
+                        latch.countDown();
+
+                        log.info("Received event: " + evt);
+                    }
+                }
+            };
+
+            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+            for (int i = 10; i < 20; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, SECONDS));
+        }
+        finally {
+            if (uuid != null)
+                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                    .cancelInternalQuery(uuid);
+
+            cache.destroy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+            if (deploy == CLIENT)
+                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+            else if (deploy == SERVER)
+                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+                    rnd.nextBoolean()));
+            else {
+                boolean isSync = rnd.nextBoolean();
+
+                for (int i = 0; i < NODES - 1; i++)
+                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+            }
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 10 == 0)
+                        log.info("Iteration: " + i);
+
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+                }
+            }
+            finally {
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
+
+                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param nodeIdx Node index.
+     * @param curs Cursors.
+     * @param lsnrCfgs Listener configurations.
+     * @return Event queue
+     */
+    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+        int nodeIdx,
+        Collection<QueryCursor<?>> curs,
+        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+        boolean sync) {
+        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+                new MutableCacheEntryListenerConfiguration<>(
+                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    }),
+                    new FilterFactory(),
+                    true,
+                    sync
+                );
+
+            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+        }
+        else {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<?, ?> evt : evts)
+                        evtsQueue.add(evt);
+                }
+            });
+
+            qry.setRemoteFilterFactory(new FilterFactory());
+
+            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+            curs.add(cur);
+        }
+
+        return evtsQueue;
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueues Events queue.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(evtsQueues);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        } finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
+        if ((val == null && oldVal == null
+            || (val != null && !isAccepted((QueryTestValue)val)))) {
+            checkNoEvent(evtsQueues);
+
+            return;
+        }
+
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertNull(evt);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class NonSerializableFilter
+        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Integer val) {
+            return val == null || val % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class FilterFactory implements Factory<NonSerializableFilter> {
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+        Externalizable {
+        /** */
+        public LocalNonSerialiseListener() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
 
             if (hnd.isQuery() && hnd.cacheName() == null) {
-                backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+                backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..97f9e0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+        cfg.setPeerClassLoadingEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param isClient Client.
+     * @throws Exception If failed.
+     */
+    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        QueryCursor<?> cur = null;
+
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        TestLocalListener localLsnr = new TestLocalListener() {
+            @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+                throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+                    latch.countDown();
+
+                    log.info("Received event: " + evt);
+                }
+            }
+        };
+
+        MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+            new MutableCacheEntryListenerConfiguration<>(
+                new FactoryBuilder.SingletonFactory<>(localLsnr),
+                (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+                    (Object)evtFilterFactory.newInstance(),
+                true,
+                true
+            );
+
+        qry.setLocalListener(localLsnr);
+
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+        IgniteCache<Integer, Integer> cache = null;
+
+        try {
+            if (isClient)
+                cache = grid(NODES - 1).cache(ccfg.getName());
+            else
+                cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+            cur = cache.query(qry);
+
+            cache.registerCacheEntryListener(lsnrCfg);
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+        }
+        finally {
+            if (cur != null)
+                cur.close();
+
+            if (cache != null)
+                cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>,
+        CacheEntryCreatedListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+            throws CacheEntryListenerException {
+            onEvent(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+            throws CacheEntryListenerException {
+            onEvent(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts);
+    }
+}


Mime
View raw message