ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [06/14] incubator-ignite git commit: IGNITE-143 - Continuous queries refactoring
Date Thu, 12 Feb 2015 21:50:56 GMT
IGNITE-143 - Continuous queries refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f8f0699d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f8f0699d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f8f0699d

Branch: refs/heads/ignite-143
Commit: f8f0699d1dc484a24f41f651377fed729652304e
Parents: 0751bcf
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Wed Feb 11 10:36:02 2015 -0800
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Wed Feb 11 10:36:02 2015 -0800

----------------------------------------------------------------------
 .../datagrid/CacheContinuousQueryExample.java   |  53 +-
 .../cache/query/CacheContinuousQuery.java       | 284 ------
 .../cache/query/CacheContinuousQueryEntry.java  |  49 -
 .../ignite/cache/query/ContinuousQuery.java     | 103 +-
 .../org/apache/ignite/cache/query/Query.java    |   9 +
 .../ignite/events/CacheQueryExecutedEvent.java  |   9 +-
 .../ignite/events/CacheQueryReadEvent.java      |   9 +-
 .../processors/cache/CacheEntryEvent.java       |  75 --
 .../processors/cache/GridCacheContext.java      |   6 +-
 .../processors/cache/GridCacheProcessor.java    |  11 +-
 .../processors/cache/IgniteCacheProxy.java      | 112 +++
 .../CacheDataStructuresManager.java             |  44 +-
 .../processors/cache/query/CacheQueries.java    |  10 -
 .../cache/query/GridCacheQueriesImpl.java       |   5 -
 .../cache/query/GridCacheQueriesProxy.java      |  12 -
 .../continuous/CacheContinuousQueryEntry.java   | 256 +++++
 .../continuous/CacheContinuousQueryEvent.java   |  81 ++
 .../CacheContinuousQueryFilterEx.java           |  31 +
 .../continuous/CacheContinuousQueryHandler.java | 484 +++++++++
 .../CacheContinuousQueryListener.java           |  41 +
 .../continuous/CacheContinuousQueryManager.java | 619 ++++++++++++
 .../GridCacheContinuousQueryAdapter.java        | 318 ------
 .../GridCacheContinuousQueryEntry.java          | 766 --------------
 .../GridCacheContinuousQueryFilterEx.java       |  33 -
 .../GridCacheContinuousQueryHandler.java        | 571 -----------
 .../GridCacheContinuousQueryListener.java       |  41 -
 .../GridCacheContinuousQueryManager.java        | 784 ---------------
 .../service/GridServiceProcessor.java           |  70 +-
 .../optimized/optimized-classnames.properties   |   4 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java | 997 ++++---------------
 ...dCacheContinuousQueryReplicatedSelfTest.java |  95 +-
 .../GridContinuousOperationsLoadTest.java       |  54 +-
 .../loadtests/hashmap/GridCacheTestContext.java |   2 +-
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  49 +-
 34 files changed, 2120 insertions(+), 3967 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index ce05988..26fd2d2 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,11 +18,10 @@
 package org.apache.ignite.examples.datagrid;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
-import org.apache.ignite.lang.*;
 
-import java.util.*;
+import javax.cache.*;
+import javax.cache.event.*;
 
 /**
  * This examples demonstrates continuous query API.
@@ -48,46 +47,40 @@ public class CacheContinuousQueryExample {
             System.out.println();
             System.out.println(">>> Cache continuous query example started.");
 
-            GridCache<Integer, String> cache = ignite.cache(CACHE_NAME);
+            IgniteCache<Integer, String> cache = ignite.jcache(CACHE_NAME);
 
             // Clean up caches on all nodes before run.
-            cache.clear(0);
+            cache.clear();
 
             int keyCnt = 20;
 
             for (int i = 0; i < keyCnt; i++)
-                cache.putx(i, Integer.toString(i));
+                cache.put(i, Integer.toString(i));
 
             // Create new continuous query.
-            try (CacheContinuousQuery<Integer, String> qry = cache.queries().createContinuousQuery()) {
-                // Callback that is called locally when update notifications are received.
-                qry.localCallback(
-                    new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, String>>>() {
-                        @Override public boolean apply(
-                            UUID nodeId,
-                            Collection<CacheContinuousQueryEntry<Integer, String>> entries
-                        ) {
-                            for (CacheContinuousQueryEntry<Integer, String> e : entries)
-                                System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+            ContinuousQuery<Integer, String> qry = Query.continuous();
 
-                            return true; // Return true to continue listening.
-                        }
-                    });
+            // Callback that is called locally when update notifications are received.
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+                    for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
+                        System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+                }
+            });
 
-                // This filter will be evaluated remotely on all nodes
-                // Entry that pass this filter will be sent to the caller.
-                qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, String>>() {
-                    @Override public boolean apply(CacheContinuousQueryEntry<Integer, String> e) {
-                        return e.getKey() > 15;
-                    }
-                });
-
-                // Execute query.
-                qry.execute();
+            // This filter will be evaluated remotely on all nodes.
+            // Entry that pass this filter will be sent to the caller.
+            qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
+                @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+                    return e.getKey() > 15;
+                }
+            });
 
+            // Execute query.
+            try (QueryCursor<Cache.Entry<Integer, String>> ignored = cache.query(qry)) {
                 // Add a few more keys and watch more query notifications.
                 for (int i = keyCnt; i < keyCnt + 5; i++)
-                    cache.putx(i, Integer.toString(i));
+                    cache.put(i, Integer.toString(i));
 
                 // Wait for a while while callback is notified about remaining puts.
                 Thread.sleep(2000);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
deleted file mode 100644
index ff4d38a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.query;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * API for configuring and executing continuous cache queries.
- * <p>
- * Continuous queries are executed as follows:
- * <ol>
- * <li>
- *  Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
- *  and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed
- *  locally.
- * </li>
- * <li>
- *  Each node iterates through existing cache data and registers listeners that will
- *  notify about further updates.
- * <li>
- *  Each key-value pair is passed through optional filter and if this filter returns
- *  true, key-value pair is sent to the master node (the one that executed query).
- *  If filter is not provided, all pairs are sent.
- * </li>
- * <li>
- *  When master node receives key-value pairs, it notifies the local callback.
- * </li>
- * </ol>
- * <h2 class="header">NOTE</h2>
- * Under some concurrent circumstances callback may get several notifications
- * for one cache update. This should be taken into account when implementing callback.
- * <h1 class="header">Query usage</h1>
- * As an example, suppose we have cache with {@code 'Person'} objects and we need
- * to query all persons with salary above 1000.
- * <p>
- * Here is the {@code Person} class:
- * <pre name="code" class="java">
- * public class Person {
- *     // Name.
- *     private String name;
- *
- *     // Salary.
- *     private double salary;
- *
- *     ...
- * }
- * </pre>
- * <p>
- * You can create and execute continuous query like so:
- * <pre name="code" class="java">
- * // Create new continuous query.
- * qry = cache.createContinuousQuery();
- *
- * // Callback that is called locally when update notifications are received.
- * // It simply prints out information about all created persons.
- * qry.callback(new GridPredicate2&lt;UUID, Collection&lt;Map.Entry&lt;UUID, Person&gt;&gt;&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, Collection&lt;Map.Entry&lt;UUID, Person&gt;&gt; entries) {
- *         for (Map.Entry&lt;UUID, Person&gt; e : entries) {
- *             Person p = e.getValue();
- *
- *             X.println("&gt;&gt;&gt;");
- *             X.println("&gt;&gt;&gt; " + p.getFirstName() + " " + p.getLastName() +
- *                 "'s salary is " + p.getSalary());
- *             X.println("&gt;&gt;&gt;");
- *         }
- *
- *         return true;
- *     }
- * });
- *
- * // This query will return persons with salary above 1000.
- * qry.filter(new GridPredicate2&lt;UUID, Person&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, Person person) {
- *         return person.getSalary() &gt; 1000;
- *     }
- * });
- *
- * // Execute query.
- * qry.execute();
- * </pre>
- * This will execute query on all nodes that have cache you are working with and notify callback
- * with both data that already exists in cache and further updates.
- * <p>
- * To stop receiving updates call {@link #close()} method:
- * <pre name="code" class="java">
- * qry.cancel();
- * </pre>
- * Note that one query instance can be executed only once. After it's cancelled, it's non-operational.
- * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create
- * new query.
- */
-public interface CacheContinuousQuery<K, V> extends AutoCloseable {
-    /**
-     * Default buffer size. Size of {@code 1} means that all entries
-     * will be sent to master node immediately (buffering is disabled).
-     */
-    public static final int DFLT_BUF_SIZE = 1;
-
-    /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */
-    public static final long DFLT_TIME_INTERVAL = 0;
-
-    /**
-     * Default value for automatic unsubscription flag. Remote filters
-     * will be unregistered by default if master node leaves topology.
-     */
-    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
-
-    /**
-     * Sets local callback. This callback is called only
-     * in local node when new updates are received.
-     * <p>
-     * The callback predicate accepts ID of the node from where updates
-     * are received and collection of received entries. Note that
-     * for removed entries value will be {@code null}.
-     * <p>
-     * If the predicate returns {@code false}, query execution will
-     * be cancelled.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local
-     * or distributed locking (e.g., synchronization or transactional
-     * cache operations), should be executed asynchronously without
-     * blocking the thread that called the callback. Otherwise, you
-     * can get deadlocks.
-     *
-     * @param locCb Local callback.
-     */
-    public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb);
-
-    /**
-     * Gets local callback. See {@link #localCallback(IgniteBiPredicate)} for more information.
-     *
-     * @return Local callback.
-     */
-    @Nullable public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback();
-
-    /**
-     * Sets optional key-value filter. This filter is called before
-     * entry is sent to the master node.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local
-     * or distributed locking (e.g., synchronization or transactional
-     * cache operations), should be executed asynchronously without
-     * blocking the thread that called the filter. Otherwise, you
-     * can get deadlocks.
-     *
-     * @param filter Key-value filter.
-     */
-    public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter);
-
-    /**
-     * Gets key-value filter. See {@link #remoteFilter(IgnitePredicate)} for more information.
-     *
-     * @return Key-value filter.
-     */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter();
-
-    /**
-     * Sets buffer size.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer.
-     * Entries from buffer will be sent to the master node only if
-     * the buffer is full or time provided via {@link #timeInterval(long)}
-     * method is exceeded.
-     * <p>
-     * Default buffer size is {@code 1} which means that entries will
-     * be sent immediately (buffering is disabled).
-     *
-     * @param bufSize Buffer size.
-     */
-    public void bufferSize(int bufSize);
-
-    /**
-     * Gets buffer size. See {@link #bufferSize(int)} for more information.
-     *
-     * @return Buffer size.
-     */
-    public int bufferSize();
-
-    /**
-     * Sets time interval.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer.
-     * Entries from buffer will be sent to the master node only if
-     * the buffer is full (its size can be provided via {@link #bufferSize(int)}
-     * method) or time provided via this method is exceeded.
-     * <p>
-     * Default time interval is {@code 0} which means that time check is
-     * disabled and entries will be sent only when buffer is full.
-     *
-     * @param timeInterval Time interval.
-     */
-    public void timeInterval(long timeInterval);
-
-    /**
-     * Gets time interval. See {@link #timeInterval(long)} for more information.
-     *
-     * @return Gets time interval.
-     */
-    public long timeInterval();
-
-    /**
-     * Sets automatic unsubscribe flag.
-     * <p>
-     * This flag indicates that query filters on remote nodes should be automatically
-     * unregistered if master node (node that initiated the query) leaves topology.
-     * If this flag is {@code false}, filters will be unregistered only when
-     * the query is cancelled from master node, and won't ever be unregistered if
-     * master node leaves grid.
-     * <p>
-     * Default value for this flag is {@code true}.
-     *
-     * @param autoUnsubscribe Automatic unsubscription flag.
-     */
-    public void autoUnsubscribe(boolean autoUnsubscribe);
-
-    /**
-     * Gets automatic unsubscribe flag. See {@link #autoUnsubscribe(boolean)}
-     * for more information.
-     *
-     * @return Automatic unsubscribe flag.
-     */
-    public boolean isAutoUnsubscribe();
-
-    /**
-     * Starts continuous query execution on the whole grid.
-     * <p>
-     * Note that if grid contains nodes without appropriate cache,
-     * these nodes will be filtered out.
-     * <p>
-     * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
-     * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches
-     * query will be always executed locally.
-     *
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void execute() throws IgniteCheckedException;
-
-    /**
-     * Starts continuous query execution on provided set of nodes.
-     * <p>
-     * Note that if provided projection contains nodes without
-     * appropriate cache, these nodes will be filtered out.
-     * <p>
-     * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
-     * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches
-     * query will be always executed locally.
-     *
-     * @param prj Grid projection.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException;
-
-    /**
-     * Stops continuous query execution.
-     * <p>
-     * Note that one query instance can be executed only once.
-     * After it's cancelled, it's non-operational.
-     * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()}
-     * method to create new query.
-     *
-     * @throws IgniteCheckedException In case of error.
-     */
-    @Override public void close() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
deleted file mode 100644
index 90d3602..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.query;
-
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Entry used for continuous query notifications.
- */
-public interface CacheContinuousQueryEntry<K, V> extends Map.Entry<K, V>, Serializable {
-    /**
-     * Gets entry key.
-     *
-     * @return Entry key.
-     */
-    @Override public K getKey();
-
-    /**
-     * Gets entry new value. New value may be null, if entry is being removed.
-     *
-     * @return Entry new value.
-     */
-    @Override @Nullable public V getValue();
-
-    /**
-     * Gets entry old value. Old value may be null if entry is being inserted (not updated).
-     *
-     * @return Gets entry old value.
-     */
-    @Nullable public V getOldValue();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index b02c65f..8d79101 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.cache.query;
 
-import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 
 import javax.cache.event.*;
 
@@ -106,7 +106,7 @@ import javax.cache.event.*;
  * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create
  * new query.
  */
-public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> implements AutoCloseable {
+public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -125,6 +125,26 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp
      */
     public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
 
+    /** Local listener. */
+    private CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** Remote filter. */
+    private CacheEntryEventFilter<K, V> rmtFilter;
+
+    /** Buffer size. */
+    private int bufSize = DFLT_BUF_SIZE;
+
+    /** Time interval. */
+    private long timeInterval = DFLT_TIME_INTERVAL;
+
+    /** Automatic unsubscription flag. */
+    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
+
+    /**
+     * TODO
+     *
+     * @param filter TODO
+     */
     public void setInitialPredicate(Query filter) {
         // TODO: implement.
     }
@@ -143,7 +163,16 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp
      * @param locLsnr Local callback.
      */
     public void setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
-        // TODO: implement.
+        this.locLsnr = locLsnr;
+    }
+
+    /**
+     * Gets local listener.
+     *
+     * @return Local listener.
+     */
+    public CacheEntryUpdatedListener<K, V> getLocalListener() {
+        return locLsnr;
     }
 
     /**
@@ -153,56 +182,86 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
      *
-     * @param filter Key-value filter.
+     * @param rmtFilter Key-value filter.
      */
-    public void setRemoteFilter(CacheEntryEventFilter<K, V> filter) {
-        // TODO: implement.
+    public void setRemoteFilter(CacheEntryEventFilter<K, V> rmtFilter) {
+        this.rmtFilter = rmtFilter;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
+     */
+    public CacheEntryEventFilter<K, V> getRemoteFilter() {
+        return rmtFilter;
     }
 
     /**
      * Sets buffer size. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will be
-     * sent to the master node only if the buffer is full or time provided via {@link #timeInterval(long)} method is
+     * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is
      * exceeded. <p> Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is
      * disabled).
      *
      * @param bufSize Buffer size.
      */
-    public void bufferSize(int bufSize) {
-        // TODO: implement.
+    public void setBufferSize(int bufSize) {
+        A.ensure(bufSize > 0, "bufSize > 0");
+
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * Gets buffer size.
+     *
+     * @return Buffer size.
+     */
+    public int getBufferSize() {
+        return bufSize;
     }
 
     /**
      * Sets time interval. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will
-     * be sent to the master node only if the buffer is full (its size can be provided via {@link #bufferSize(int)}
+     * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)}
      * method) or time provided via this method is exceeded. <p> Default time interval is {@code 0} which means that
      * time check is disabled and entries will be sent only when buffer is full.
      *
      * @param timeInterval Time interval.
      */
-    public void timeInterval(long timeInterval) {
-        // TODO: implement.
+    public void setTimeInterval(long timeInterval) {
+        A.ensure(timeInterval >= 0, "timeInterval >= 0");
+
+        this.timeInterval = timeInterval;
+    }
+
+    /**
+     * Gets time interval.
+     *
+     * @return Time interval.
+     */
+    public long getTimeInterval() {
+        return timeInterval;
     }
 
     /**
      * Sets automatic unsubscribe flag. <p> This flag indicates that query filters on remote nodes should be
-     * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is {@code
-     * false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be
+     * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
+     * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be
      * unregistered if master node leaves grid. <p> Default value for this flag is {@code true}.
      *
      * @param autoUnsubscribe Automatic unsubscription flag.
      */
-    public void autoUnsubscribe(boolean autoUnsubscribe) {
-        // TODO: implement.
+    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
     }
 
     /**
-     * Stops continuous query execution. <p> Note that one query instance can be executed only once. After it's
-     * cancelled, it's non-operational. If you need to repeat execution, use {@link
-     * org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create new query.
+     * Gets automatic unsubscription flag value.
      *
-     * @throws IgniteCheckedException In case of error.
+     * @return Automatic unsubscription flag.
      */
-    @Override public void close() throws IgniteCheckedException {
-        // TODO: implement.
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
index 744d8d2..c24d704 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
@@ -106,6 +106,15 @@ public abstract class Query<T extends Query> implements Serializable {
     }
 
     /**
+     * Factory method for continuous queries.
+     *
+     * @return Continuous query.
+     */
+    public static <K, V> ContinuousQuery<K, V> continuous() {
+        return new ContinuousQuery<>();
+    }
+
+    /**
      * Gets optional page size, if {@code 0}, then {@link CacheQueryConfiguration#getPageSize()} is used.
      *
      * @return Optional page size.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
index 2733d64..a7563a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
@@ -17,15 +17,14 @@
 
 package org.apache.ignite.events;
 
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.util.*;
 
 /**
@@ -85,7 +84,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
 
     /** Continuous query filter. */
     @GridToStringInclude
-    private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter;
+    private final CacheEntryEventFilter<K, V> contQryFilter;
 
     /** Query arguments. */
     @GridToStringInclude
@@ -118,7 +117,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<K, V> scanQryFilter,
-        @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter,
+        @Nullable CacheEntryEventFilter<K, V> contQryFilter,
         @Nullable Object[] args,
         @Nullable UUID subjId,
         @Nullable String taskName) {
@@ -195,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
      *
      * @return Continuous query filter.
      */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() {
+    @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
         return contQryFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
index 322feff..1959976 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
@@ -17,15 +17,14 @@
 
 package org.apache.ignite.events;
 
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.util.*;
 
 /**
@@ -85,7 +84,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter {
 
     /** Continuous query filter. */
     @GridToStringInclude
-    private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter;
+    private final CacheEntryEventFilter<K, V> contQryFilter;
 
     /** Query arguments. */
     @GridToStringInclude
@@ -136,7 +135,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<K, V> scanQryFilter,
-        @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter,
+        @Nullable CacheEntryEventFilter<K, V> contQryFilter,
         @Nullable Object[] args,
         @Nullable UUID subjId,
         @Nullable String taskName,
@@ -221,7 +220,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter {
      *
      * @return Continuous query filter.
      */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() {
+    @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
         return contQryFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
deleted file mode 100644
index 1ff4be8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-
-import javax.cache.event.*;
-
-/**
- * Implementation of {@link javax.cache.event.CacheEntryEvent}.
- */
-public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> {
-    /** */
-    private final CacheContinuousQueryEntry<K, V> e;
-
-    /**
-     * @param src Cache.
-     * @param type Event type.
-     * @param e Ignite event.
-     */
-    public CacheEntryEvent(IgniteCache src, EventType type, CacheContinuousQueryEntry<K, V> e) {
-        super(src, type);
-
-        this.e = e;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getOldValue() {
-        return e.getOldValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isOldValueAvailable() {
-        return e.getOldValue() != null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public K getKey() {
-        return e.getKey();
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getValue() {
-        return e.getValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T unwrap(Class<T> cls) {
-        throw new IllegalArgumentException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "CacheEntryEvent [evtType=" + getEventType() +
-            ", key=" + getKey() +
-            ", val=" + getValue() +
-            ", oldVal=" + getOldValue() + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8fe80e3..b40d089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -110,7 +110,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     private GridCacheQueryManager<K, V> qryMgr;
 
     /** Continuous query manager. */
-    private GridCacheContinuousQueryManager<K, V> contQryMgr;
+    private CacheContinuousQueryManager<K, V> contQryMgr;
 
     /** Swap manager. */
     private GridCacheSwapManager<K, V> swapMgr;
@@ -235,7 +235,7 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheStoreManager<K, V> storeMgr,
         GridCacheEvictionManager<K, V> evictMgr,
         GridCacheQueryManager<K, V> qryMgr,
-        GridCacheContinuousQueryManager<K, V> contQryMgr,
+        CacheContinuousQueryManager<K, V> contQryMgr,
         GridCacheAffinityManager<K, V> affMgr,
         CacheDataStructuresManager<K, V> dataStructuresMgr,
         GridCacheTtlManager<K, V> ttlMgr,
@@ -854,7 +854,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     /**
      * @return Continuous query manager, {@code null} if disabled.
      */
-    public GridCacheContinuousQueryManager<K, V> continuousQueries() {
+    public CacheContinuousQueryManager<K, V> continuousQueries() {
         return contQryMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 53427d5..d85bf05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -627,7 +627,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg));
             GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
             GridCacheQueryManager qryMgr = queryManager(cfg);
-            GridCacheContinuousQueryManager contQryMgr = new GridCacheContinuousQueryManager();
+            CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
             CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
             GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
             GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class);
@@ -763,7 +763,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                  * 2. GridCacheIoManager
                  * 3. GridCacheDeploymentManager
                  * 4. GridCacheQueryManager (note, that we start it for DHT cache though).
-                 * 5. GridCacheContinuousQueryManager (note, that we start it for DHT cache though).
+                 * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
                  * 6. GridCacheDgcManager
                  * 7. GridCacheTtlManager.
                  * ===============================================
@@ -1595,6 +1595,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Utility cache.
+     */
+    public <K, V> IgniteCache<K, V> utilityJCache() {
+        return jcache(CU.UTILITY_CACHE_NAME);
+    }
+
+    /**
      * Gets utility cache for atomic data structures.
      *
      * @return Utility cache for atomic data structures.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 20d40e6..ebb8a60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -24,6 +24,8 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -31,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
+import org.apache.ignite.plugin.security.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -42,6 +45,8 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+
 /**
  * Cache proxy.
  */
@@ -70,6 +75,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     /** Projection. */
     private GridCacheProjectionImpl<K, V> prj;
 
+    /** Logger. */
+    private IgniteLogger log;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -97,6 +105,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         this.prj = prj;
 
         gate = ctx.gate();
+
+        log = ctx.logger(getClass());
     }
 
     /**
@@ -323,6 +333,102 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
+     * Executes continuous query.
+     *
+     * @param qry Query.
+     * @param grp Cluster group.
+     * @return Initial iteration cursor.
+     */
+    private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, @Nullable ClusterGroup grp) {
+        if (qry.getLocalListener() == null)
+            throw new IllegalStateException("Mandatory local listener is not set for the query: " + qry);
+
+        ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+        IgniteEx grid = ctx.kernalContext().grid();
+
+        if (grp == null)
+            grp = grid;
+
+        grp = grp.forCacheNodes(ctx.name());
+
+        Collection<ClusterNode> nodes = grp.nodes();
+
+        if (nodes.isEmpty())
+            throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " +
+                "provided): " + qry);
+
+        boolean skipPrimaryCheck = false;
+
+        switch (ctx.config().getCacheMode()) {
+            case LOCAL:
+                if (!nodes.contains(ctx.localNode()))
+                    throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " +
+                        "only locally (provided projection contains remote nodes only): " + qry);
+                else if (nodes.size() > 1)
+                    U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " +
+                        "ignored): " + this);
+
+                grp = grp.forNode(ctx.localNode());
+
+                break;
+
+            case REPLICATED:
+                if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) {
+                    CacheDistributionMode distributionMode = ctx.config().getDistributionMode();
+
+                    if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED)
+                        skipPrimaryCheck = true;
+                }
+
+                break;
+        }
+
+        int taskNameHash = ctx.kernalContext().security().enabled() ?
+            ctx.kernalContext().job().currentTaskNameHash() : 0;
+
+        GridContinuousHandler hnd = new CacheContinuousQueryHandler<>(
+            ctx.name(),
+            ctx.continuousQueries().topic(),
+            qry.getLocalListener(),
+            qry.getRemoteFilter(),
+            false,
+            false,
+            false,
+            true,
+            skipPrimaryCheck,
+            taskNameHash,
+            false);
+
+        try {
+            final UUID routineId = ctx.kernalContext().continuous().startRoutine(hnd, qry.getBufferSize(),
+                qry.getTimeInterval(), qry.isAutoUnsubscribe(), grp.predicate()).get();
+
+            return new QueryCursor<Entry<K, V>>() {
+                @Override public Iterator<Entry<K, V>> iterator() {
+                    return new GridEmptyIterator<>();
+                }
+
+                @Override public List<Entry<K, V>> getAll() {
+                    return Collections.emptyList();
+                }
+
+                @Override public void close() {
+                    try {
+                        ctx.kernalContext().continuous().stopRoutine(routineId).get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+            };
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
      * @param local Enforce local.
      * @return Local node cluster group.
      */
@@ -331,6 +437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public QueryCursor<Entry<K,V>> query(Query qry) {
         A.notNull(qry, "qry");
 
@@ -345,6 +452,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
                 return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs());
             }
+            else if (qry instanceof ContinuousQuery)
+                return queryContinuous((ContinuousQuery<K, V>)qry, projection(false));
 
             return query(qry, projection(false));
         }
@@ -401,6 +510,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) {
         A.notNull(qry, "qry");
 
@@ -409,6 +519,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         try {
             if (qry instanceof SqlQuery)
                 return doLocalQuery((SqlQuery)qry);
+            else if (qry instanceof ContinuousQuery)
+                return queryContinuous((ContinuousQuery<K, V>)qry, projection(true));
 
             return query(qry, projection(true));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 5f68a7a..5ee2b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -23,7 +23,6 @@ import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
@@ -33,6 +32,9 @@ import org.apache.ignite.resources.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -59,7 +61,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
     private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView;
 
     /** Query notifying about queue update. */
-    private GridCacheContinuousQueryAdapter queueQry;
+    private QueryCursor<Cache.Entry<Object, Object>> queueQryCur;
 
     /** Queue query creation guard. */
     private final AtomicBoolean queueQryGuard = new AtomicBoolean();
@@ -98,14 +100,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
     @Override protected void onKernalStop0(boolean cancel) {
         busyLock.block();
 
-        if (queueQry != null) {
-            try {
-                queueQry.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to cancel queue header query.", e);
-            }
-        }
+        if (queueQryCur != null)
+            queueQryCur.close();
 
         for (GridCacheQueueProxy q : queuesMap.values())
             q.delegate().onKernalStop();
@@ -188,17 +184,15 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
                 return null;
 
             if (queueQryGuard.compareAndSet(false, true)) {
-                queueQry = (GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery();
+                ContinuousQuery<Object, Object> qry = Query.continuous();
 
-                queueQry.remoteFilter(new QueueHeaderPredicate());
-
-                queueQry.localCallback(new IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry>>() {
-                    @Override public boolean apply(UUID id, Collection<GridCacheContinuousQueryEntry> entries) {
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                         if (!busyLock.enterBusy())
-                            return false;
+                            return;
 
                         try {
-                            for (GridCacheContinuousQueryEntry e : entries) {
+                            for (CacheEntryEvent<?, ?> e : evts) {
                                 GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey();
                                 GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue();
 
@@ -220,8 +214,6 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
                                     }
                                 }
                             }
-
-                            return true;
                         }
                         finally {
                             busyLock.leaveBusy();
@@ -229,11 +221,11 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
                     }
                 });
 
-                queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null,
-                    true,
-                    false,
-                    false,
-                    true);
+                qry.setRemoteFilter(new QueueHeaderPredicate());
+
+                IgniteCache<Object, Object> jCache = cctx.kernalContext().cache().utilityJCache();
+
+                queueQryCur = cctx.isLocal() || cctx.isReplicated() ? jCache.localQuery(qry) : jCache.query(qry);
             }
 
             GridCacheQueueProxy queue = queuesMap.get(hdr.id());
@@ -544,7 +536,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
     /**
      * Predicate for queue continuous query.
      */
-    private static class QueueHeaderPredicate implements IgnitePredicate<CacheContinuousQueryEntry>, Externalizable {
+    private static class QueueHeaderPredicate implements CacheEntryEventFilter<Object, Object>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -556,7 +548,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K,
         }
 
         /** {@inheritDoc} */
-        @Override public boolean apply(CacheContinuousQueryEntry e) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> e) {
             return e.getKey() instanceof GridCacheQueueHeaderKey;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
index 4dad74c..3dcb82a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
@@ -93,16 +93,6 @@ public interface CacheQueries<K, V> {
     public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter);
 
     /**
-     * Creates new continuous query.
-     * <p>
-     * For more information refer to {@link org.apache.ignite.cache.query.CacheContinuousQuery} documentation.
-     *
-     * @return Created continuous query.
-     * @see org.apache.ignite.cache.query.CacheContinuousQuery
-     */
-    public CacheContinuousQuery<K, V> createContinuousQuery();
-
-    /**
      * Forces this cache to rebuild all search indexes of given value type. Sometimes indexes
      * may hold references to objects that have already been removed from cache. Although
      * not affecting query results, these objects may consume extra memory. Rebuilding

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
index 797990f..52e8d96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
@@ -177,11 +177,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext
     }
 
     /** {@inheritDoc} */
-    @Override public CacheContinuousQuery<K, V> createContinuousQuery() {
-        return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate());
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> rebuildIndexes(Class<?> cls) {
         A.notNull(cls, "cls");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
index 509a7b2..5e19cd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
@@ -151,18 +151,6 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex
     }
 
     /** {@inheritDoc} */
-    @Override public CacheContinuousQuery<K, V> createContinuousQuery() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
-        try {
-            return delegate.createContinuousQuery();
-        }
-        finally {
-            gate.leave(prev);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public <R> CacheQuery<R> createSpiQuery() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
new file mode 100644
index 0000000..92f0f0a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*;
+
+/**
+ * Continuous query event.
+ */
+class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Key. */
+    @GridToStringInclude
+    private K key;
+
+    /** New value. */
+    @GridToStringInclude
+    private V newVal;
+
+    /** Old value. */
+    @GridToStringInclude
+    private V oldVal;
+
+    /** Serialized key. */
+    @GridToStringExclude
+    private byte[] keyBytes;
+
+    /** Serialized value. */
+    @GridToStringExclude
+    private GridCacheValueBytes newValBytes;
+
+    /** Serialized value. */
+    @GridToStringExclude
+    private GridCacheValueBytes oldValBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Deployment info. */
+    @GridToStringExclude
+    private GridDeploymentInfo depInfo;
+
+    public CacheContinuousQueryEntry() {
+        // No-op.
+    }
+
+    CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal,
+        @Nullable GridCacheValueBytes oldValBytes) {
+
+        this.key = key;
+        this.newVal = newVal;
+        this.newValBytes = newValBytes;
+        this.oldVal = oldVal;
+        this.oldValBytes = oldValBytes;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    void cacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * @return cache name.
+     */
+    String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * Unmarshals value from bytes if needed.
+     *
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        assert marsh != null;
+
+        if (newVal == null && newValBytes != null && !newValBytes.isNull())
+            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException In case of error.
+     */
+    void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
+        assert marsh != null;
+
+        assert key != null;
+
+        keyBytes = marsh.marshal(key);
+
+        if (newValBytes == null || newValBytes.isNull())
+            newValBytes = newVal != null ?
+                newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null;
+
+        if (oldValBytes == null || oldValBytes.isNull())
+            oldValBytes = oldVal != null ?
+                oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null;
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        assert marsh != null;
+
+        assert key == null : "Key should be null: " + key;
+        assert newVal == null : "New value should be null: " + newVal;
+        assert oldVal == null : "Old value should be null: " + oldVal;
+        assert keyBytes != null;
+
+        key = marsh.unmarshal(keyBytes, ldr);
+
+        if (newValBytes != null && !newValBytes.isNull())
+            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
+
+        if (oldValBytes != null && !oldValBytes.isNull())
+            oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr);
+    }
+
+    /**
+     * @return Key.
+     */
+    K key() {
+        return key;
+    }
+
+    /**
+     * @return New value.
+     */
+    V value() {
+        return newVal;
+    }
+
+    /**
+     * @return Old value.
+     */
+    V oldValue() {
+        return oldVal;
+    }
+
+    /**
+     * Nullifies old value.
+     */
+    void nullifyOldValue() {
+        oldVal = null;
+        oldValBytes = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepare(GridDeploymentInfo depInfo) {
+        this.depInfo = depInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDeploymentInfo deployInfo() {
+        return depInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        boolean b = keyBytes != null;
+
+        out.writeBoolean(b);
+
+        if (b) {
+            U.writeByteArray(out, keyBytes);
+
+            if (newValBytes != null && !newValBytes.isNull()) {
+                out.writeBoolean(true);
+                out.writeBoolean(newValBytes.isPlain());
+                U.writeByteArray(out, newValBytes.get());
+            }
+            else
+                out.writeBoolean(false);
+
+            if (oldValBytes != null && !oldValBytes.isNull()) {
+                out.writeBoolean(true);
+                out.writeBoolean(oldValBytes.isPlain());
+                U.writeByteArray(out, oldValBytes.get());
+            }
+            else
+                out.writeBoolean(false);
+
+            U.writeString(out, cacheName);
+            out.writeObject(depInfo);
+        }
+        else {
+            out.writeObject(key);
+            out.writeObject(newVal);
+            out.writeObject(oldVal);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        boolean b = in.readBoolean();
+
+        if (b) {
+            keyBytes = U.readByteArray(in);
+
+            if (in.readBoolean())
+                newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+
+            if (in.readBoolean())
+                oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+
+            cacheName = U.readString(in);
+            depInfo = (GridDeploymentInfo)in.readObject();
+        }
+        else {
+            key = (K)in.readObject();
+            newVal = (V)in.readObject();
+            oldVal = (V)in.readObject();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryEntry.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
new file mode 100644
index 0000000..b284ef2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.*;
+import javax.cache.event.*;
+
+/**
+ * Continuous query event.
+ */
+class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+    /** Entry. */
+    private final CacheContinuousQueryEntry<K, V> e;
+
+    /**
+     * @param source Source cache.
+     * @param eventType Event type.
+     * @param e Entry.
+     */
+    CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) {
+        super(source, eventType);
+
+        assert e != null;
+
+        this.e = e;
+    }
+
+    /**
+     * @return Entry.
+     */
+    CacheContinuousQueryEntry<K, V> entry() {
+        return e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return e.key();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getValue() {
+        return e.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getOldValue() {
+        return e.oldValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isOldValueAvailable() {
+        return e.oldValue() != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unwrap(Class<T> clazz) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryEvent.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
new file mode 100644
index 0000000..897f481
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+
+import javax.cache.event.*;
+
+/**
+ * Extended continuous query filter.
+ */
+public interface CacheContinuousQueryFilterEx<K, V> extends CacheEntryEventFilter<K, V> {
+    /**
+     * Callback for query unregister event.
+     */
+    public void onQueryUnregister();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
new file mode 100644
index 0000000..5c03488
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.*;
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Continuous query handler.
+ */
+public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Topic for ordered messages. */
+    private Object topic;
+
+    /** Local callback. */
+    private CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** Filter. */
+    private CacheEntryEventFilter<K, V> rmtFilter;
+
+    /** Deployable object for filter. */
+    private DeployableObject filterDep;
+
+    /** Internal flag. */
+    private boolean internal;
+
+    /** Entry listener flag. */
+    private boolean entryLsnr;
+
+    /** Synchronous listener flag. */
+    private boolean sync;
+
+    /** {@code True} if old value is required. */
+    private boolean oldVal;
+
+    /** Task name hash code. */
+    private int taskHash;
+
+    /** Keep portable flag. */
+    private boolean keepPortable;
+
+    /** Whether to skip primary check for REPLICATED cache. */
+    private transient boolean skipPrimaryCheck;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryHandler() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheName Cache name.
+     * @param topic Topic for ordered messages.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param internal If {@code true} then query is notified about internal entries updates.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
+     * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}.
+     * @param oldVal {@code True} if old value is required.
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+     * @param taskHash Task name hash code.
+     */
+    public CacheContinuousQueryHandler(@Nullable String cacheName, Object topic,
+        CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, boolean internal,
+        boolean entryLsnr, boolean sync, boolean oldVal, boolean skipPrimaryCheck, int taskHash, boolean keepPortable) {
+        assert topic != null;
+        assert locLsnr != null;
+        assert !sync || entryLsnr;
+
+        this.cacheName = cacheName;
+        this.topic = topic;
+        this.locLsnr = locLsnr;
+        this.rmtFilter = rmtFilter;
+        this.internal = internal;
+        this.entryLsnr = entryLsnr;
+        this.sync = sync;
+        this.oldVal = oldVal;
+        this.taskHash = taskHash;
+        this.keepPortable = keepPortable;
+        this.skipPrimaryCheck = skipPrimaryCheck;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForEvents() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForMessaging() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForQuery() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
+        throws IgniteCheckedException {
+        assert nodeId != null;
+        assert routineId != null;
+        assert ctx != null;
+
+        if (locLsnr != null)
+            ctx.resource().injectGeneric(locLsnr);
+
+        if (rmtFilter != null)
+            ctx.resource().injectGeneric(rmtFilter);
+
+        final boolean loc = nodeId.equals(ctx.localNodeId());
+
+        CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
+            @Override public void onExecution() {
+                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    ctx.event().record(new CacheQueryExecutedEvent<>(
+                        ctx.discovery().localNode(),
+                        "Continuous query executed.",
+                        EVT_CACHE_QUERY_EXECUTED,
+                        CacheQueryType.CONTINUOUS,
+                        cacheName,
+                        null,
+                        null,
+                        null,
+                        rmtFilter,
+                        null,
+                        nodeId,
+                        taskName()
+                    ));
+                }
+            }
+
+            @Override public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
+                    return;
+
+                boolean notify = true;
+
+                if (rmtFilter != null) {
+                    CacheFlag[] f = cctx.forceLocalRead();
+
+                    try {
+                        notify = rmtFilter.evaluate(evt);
+                    }
+                    finally {
+                        cctx.forceFlags(f);
+                    }
+                }
+
+                if (notify) {
+                    if (!oldVal)
+                        evt.entry().nullifyOldValue();
+
+                    if (loc)
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                    else {
+                        try {
+                            ClusterNode node = ctx.discovery().node(nodeId);
+
+                            if (ctx.config().isPeerClassLoadingEnabled() && node != null &&
+                                U.hasCache(node, cacheName)) {
+                                evt.entry().p2pMarshal(ctx.config().getMarshaller());
+
+                                evt.entry().cacheName(cacheName);
+
+                                GridCacheDeploymentManager depMgr =
+                                    ctx.cache().internalCache(cacheName).context().deploy();
+
+                                depMgr.prepare(evt.entry());
+                            }
+
+                            ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync);
+                        }
+                        catch (IgniteCheckedException ex) {
+                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                        }
+                    }
+
+                    if (!entryLsnr && recordIgniteEvt) {
+                        ctx.event().record(new CacheQueryReadEvent<>(
+                            ctx.discovery().localNode(),
+                            "Continuous query executed.",
+                            EVT_CACHE_QUERY_OBJECT_READ,
+                            CacheQueryType.CONTINUOUS,
+                            cacheName,
+                            null,
+                            null,
+                            null,
+                            rmtFilter,
+                            null,
+                            nodeId,
+                            taskName(),
+                            evt.getKey(),
+                            evt.getValue(),
+                            evt.getOldValue(),
+                            null
+                        ));
+                    }
+                }
+            }
+
+            @Override public void onUnregister() {
+                if (rmtFilter != null && rmtFilter instanceof CacheContinuousQueryFilterEx)
+                    ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister();
+            }
+
+            @Nullable private String taskName() {
+                return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
+            }
+        };
+
+        return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregister(UUID routineId, GridKernalContext ctx) {
+        assert routineId != null;
+        assert ctx != null;
+
+        manager(ctx).unregisterListener(internal, routineId);
+    }
+
+    /**
+     * @param ctx Kernal context.
+     * @return Continuous query manager.
+     */
+    private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) {
+        return cacheContext(ctx).continuousQueries();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+        assert nodeId != null;
+        assert routineId != null;
+        assert objs != null;
+        assert ctx != null;
+
+        Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
+            (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs;
+
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+                assert evt instanceof CacheContinuousQueryEvent;
+
+                CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry();
+
+                GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName());
+
+                ClassLoader ldr = null;
+
+                if (cache != null) {
+                    GridCacheDeploymentManager depMgr = cache.context().deploy();
+
+                    GridDeploymentInfo depInfo = e.deployInfo();
+
+                    if (depInfo != null) {
+                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
+                            depInfo.participants(), depInfo.localDeploymentOwner());
+                    }
+
+                    ldr = depMgr.globalLoader();
+                }
+                else {
+                    U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
+                        "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " +
+                        "with default class loader.");
+                }
+
+                try {
+                    e.p2pUnmarshal(ctx.config().getMarshaller(), ldr);
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                }
+            }
+        }
+
+        locLsnr.onUpdated(evts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+        assert ctx != null;
+        assert ctx.config().isPeerClassLoadingEnabled();
+
+        if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
+            filterDep = new DeployableObject(rmtFilter, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        assert nodeId != null;
+        assert ctx != null;
+        assert ctx.config().isPeerClassLoadingEnabled();
+
+        if (filterDep != null)
+            rmtFilter = filterDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object orderedTopic() {
+        return topic;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, cacheName);
+        out.writeObject(topic);
+
+        boolean b = filterDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(filterDep);
+        else
+            out.writeObject(rmtFilter);
+
+        out.writeBoolean(internal);
+        out.writeBoolean(entryLsnr);
+        out.writeBoolean(sync);
+        out.writeBoolean(oldVal);
+        out.writeInt(taskHash);
+        out.writeBoolean(keepPortable);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        cacheName = U.readString(in);
+        topic = in.readObject();
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            filterDep = (DeployableObject)in.readObject();
+        else
+            rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject();
+
+        internal = in.readBoolean();
+        entryLsnr = in.readBoolean();
+        sync = in.readBoolean();
+        oldVal = in.readBoolean();
+        taskHash = in.readInt();
+        keepPortable = in.readBoolean();
+    }
+
+    /**
+     * @param ctx Kernal context.
+     * @return Cache context.
+     */
+    private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
+        assert ctx != null;
+
+        return ctx.cache().<K, V>internalCache(cacheName).context();
+    }
+
+    /**
+     * Deployable object.
+     */
+    private static class DeployableObject implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Serialized object. */
+        private byte[] bytes;
+
+        /** Deployment class name. */
+        private String clsName;
+
+        /** Deployment info. */
+        private GridDeploymentInfo depInfo;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public DeployableObject() {
+            // No-op.
+        }
+
+        /**
+         * @param obj Object.
+         * @param ctx Kernal context.
+         * @throws IgniteCheckedException In case of error.
+         */
+        private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+            assert obj != null;
+            assert ctx != null;
+
+            Class cls = U.detectClass(obj);
+
+            clsName = cls.getName();
+
+            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
+
+            if (dep == null)
+                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
+
+            depInfo = new GridDeploymentInfoBean(dep);
+
+            bytes = ctx.config().getMarshaller().marshal(obj);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param ctx Kernal context.
+         * @return Deserialized object.
+         * @throws IgniteCheckedException In case of error.
+         */
+        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+            assert ctx != null;
+
+            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+
+            if (dep == null)
+                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+            return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeByteArray(out, bytes);
+            U.writeString(out, clsName);
+            out.writeObject(depInfo);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            bytes = U.readByteArray(in);
+            clsName = U.readString(in);
+            depInfo = (GridDeploymentInfo)in.readObject();
+        }
+    }
+}


Mime
View raw message