ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [11/30] incubator-ignite git commit: IGNITE-143 - Continuous queries refactoring (manual merge)
Date Sun, 15 Feb 2015 08:02:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
new file mode 100644
index 0000000..4ad664a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.event.*;
+import javax.cache.event.EventType;
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Continuous query handler.
+ */
+class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Topic for ordered messages. */
+    private Object topic;
+
+    /** Local listener. */
+    private transient CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** Remote filter. */
+    private CacheEntryEventFilter<K, V> rmtFilter;
+
+    /** Deployable object for filter. */
+    private DeployableObject rmtFilterDep;
+
+    /** Internal flag. */
+    private boolean internal;
+
+    /** Old value required flag. */
+    private boolean oldValRequired;
+
+    /** Synchronous flag. */
+    private boolean sync;
+
+    /** Ignore expired events flag. */
+    private boolean ignoreExpired;
+
+    /** Task name hash code. */
+    private int taskHash;
+
+    /** Whether to skip primary check for REPLICATED cache. */
+    private transient boolean skipPrimaryCheck;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryHandler() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheName Cache name.
+     * @param topic Topic for ordered messages.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param internal Internal flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired events flag.
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+     * @param taskHash Task name hash code.
+     */
+    public CacheContinuousQueryHandler(
+        String cacheName,
+        Object topic,
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        CacheEntryEventFilter<K, V> rmtFilter,
+        boolean internal,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        int taskHash,
+        boolean skipPrimaryCheck) {
+        assert topic != null;
+        assert locLsnr != null;
+
+        this.cacheName = cacheName;
+        this.topic = topic;
+        this.locLsnr = locLsnr;
+        this.rmtFilter = rmtFilter;
+        this.internal = internal;
+        this.oldValRequired = oldValRequired;
+        this.sync = sync;
+        this.ignoreExpired = ignoreExpired;
+        this.taskHash = taskHash;
+        this.skipPrimaryCheck = skipPrimaryCheck;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForEvents() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForMessaging() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isForQuery() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
+        throws IgniteCheckedException {
+        assert nodeId != null;
+        assert routineId != null;
+        assert ctx != null;
+
+        if (locLsnr != null)
+            ctx.resource().injectGeneric(locLsnr);
+
+        if (rmtFilter != null)
+            ctx.resource().injectGeneric(rmtFilter);
+
+        final boolean loc = nodeId.equals(ctx.localNodeId());
+
+        CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
+            @Override public void onExecution() {
+                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    ctx.event().record(new CacheQueryExecutedEvent<>(
+                        ctx.discovery().localNode(),
+                        "Continuous query executed.",
+                        EVT_CACHE_QUERY_EXECUTED,
+                        CacheQueryType.CONTINUOUS,
+                        cacheName,
+                        null,
+                        null,
+                        null,
+                        rmtFilter,
+                        null,
+                        nodeId,
+                        taskName()
+                    ));
+                }
+            }
+
+            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+                boolean recordIgniteEvt) {
+                if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
+                    return;
+
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
+                    return;
+
+                boolean notify = true;
+
+                if (rmtFilter != null) {
+                    CacheFlag[] f = cctx.forceLocalRead();
+
+                    try {
+                        notify = rmtFilter.evaluate(evt);
+                    }
+                    finally {
+                        cctx.forceFlags(f);
+                    }
+                }
+
+                if (notify) {
+                    if (loc)
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                    else {
+                        try {
+                            ClusterNode node = ctx.discovery().node(nodeId);
+
+                            if (ctx.config().isPeerClassLoadingEnabled() && node != null &&
+                                U.hasCache(node, cacheName)) {
+                                evt.entry().p2pMarshal(ctx.config().getMarshaller());
+
+                                evt.entry().cacheName(cacheName);
+
+                                GridCacheDeploymentManager depMgr =
+                                    ctx.cache().internalCache(cacheName).context().deploy();
+
+                                depMgr.prepare(evt.entry());
+                            }
+
+                            ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync);
+                        }
+                        catch (IgniteCheckedException ex) {
+                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                        }
+                    }
+
+                    if (recordIgniteEvt) {
+                        ctx.event().record(new CacheQueryReadEvent<>(
+                            ctx.discovery().localNode(),
+                            "Continuous query executed.",
+                            EVT_CACHE_QUERY_OBJECT_READ,
+                            CacheQueryType.CONTINUOUS,
+                            cacheName,
+                            null,
+                            null,
+                            null,
+                            rmtFilter,
+                            null,
+                            nodeId,
+                            taskName(),
+                            evt.getKey(),
+                            evt.getValue(),
+                            evt.getOldValue(),
+                            null
+                        ));
+                    }
+                }
+            }
+
+            @Override public void onUnregister() {
+                if (rmtFilter instanceof CacheContinuousQueryFilterEx)
+                    ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister();
+            }
+
+            @Override public boolean oldValueRequired() {
+                return oldValRequired;
+            }
+
+            private String taskName() {
+                return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
+            }
+        };
+
+        return manager(ctx).registerListener(routineId, lsnr, internal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregister(UUID routineId, GridKernalContext ctx) {
+        assert routineId != null;
+        assert ctx != null;
+
+        manager(ctx).unregisterListener(internal, routineId);
+    }
+
+    /**
+     * @param ctx Kernal context.
+     * @return Continuous query manager.
+     */
+    private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) {
+        return cacheContext(ctx).continuousQueries();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+        assert nodeId != null;
+        assert routineId != null;
+        assert objs != null;
+        assert ctx != null;
+
+        Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
+            (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs;
+
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+                assert evt instanceof CacheContinuousQueryEvent;
+
+                CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry();
+
+                GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName());
+
+                ClassLoader ldr = null;
+
+                if (cache != null) {
+                    GridCacheDeploymentManager depMgr = cache.context().deploy();
+
+                    GridDeploymentInfo depInfo = e.deployInfo();
+
+                    if (depInfo != null) {
+                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
+                            depInfo.participants(), depInfo.localDeploymentOwner());
+                    }
+
+                    ldr = depMgr.globalLoader();
+                }
+                else {
+                    U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
+                        "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " +
+                        "with default class loader.");
+                }
+
+                try {
+                    e.p2pUnmarshal(ctx.config().getMarshaller(), ldr);
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                }
+            }
+        }
+
+        locLsnr.onUpdated(evts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+        assert ctx != null;
+        assert ctx.config().isPeerClassLoadingEnabled();
+
+        if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
+            rmtFilterDep = new DeployableObject(rmtFilter, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        assert nodeId != null;
+        assert ctx != null;
+        assert ctx.config().isPeerClassLoadingEnabled();
+
+        if (rmtFilterDep != null)
+            rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object orderedTopic() {
+        return topic;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, cacheName);
+        out.writeObject(topic);
+
+        boolean b = rmtFilterDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(rmtFilterDep);
+        else
+            out.writeObject(rmtFilter);
+
+        out.writeBoolean(internal);
+        out.writeBoolean(oldValRequired);
+        out.writeBoolean(sync);
+        out.writeBoolean(ignoreExpired);
+        out.writeInt(taskHash);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        cacheName = U.readString(in);
+        topic = in.readObject();
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            rmtFilterDep = (DeployableObject)in.readObject();
+        else
+            rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject();
+
+        internal = in.readBoolean();
+        oldValRequired = in.readBoolean();
+        sync = in.readBoolean();
+        ignoreExpired = in.readBoolean();
+        taskHash = in.readInt();
+    }
+
+    /**
+     * @param ctx Kernal context.
+     * @return Cache context.
+     */
+    private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
+        assert ctx != null;
+
+        return ctx.cache().<K, V>internalCache(cacheName).context();
+    }
+
+    /**
+     * Deployable object.
+     */
+    private static class DeployableObject implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Serialized object. */
+        private byte[] bytes;
+
+        /** Deployment class name. */
+        private String clsName;
+
+        /** Deployment info. */
+        private GridDeploymentInfo depInfo;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public DeployableObject() {
+            // No-op.
+        }
+
+        /**
+         * @param obj Object.
+         * @param ctx Kernal context.
+         * @throws IgniteCheckedException In case of error.
+         */
+        private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+            assert obj != null;
+            assert ctx != null;
+
+            Class cls = U.detectClass(obj);
+
+            clsName = cls.getName();
+
+            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
+
+            if (dep == null)
+                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
+
+            depInfo = new GridDeploymentInfoBean(dep);
+
+            bytes = ctx.config().getMarshaller().marshal(obj);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param ctx Kernal context.
+         * @return Deserialized object.
+         * @throws IgniteCheckedException In case of error.
+         */
+        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+            assert ctx != null;
+
+            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+
+            if (dep == null)
+                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+            return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeByteArray(out, bytes);
+            U.writeString(out, clsName);
+            out.writeObject(depInfo);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            bytes = U.readByteArray(in);
+            clsName = U.readString(in);
+            depInfo = (GridDeploymentInfo)in.readObject();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
new file mode 100644
index 0000000..3695bad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+/**
+ * Continuous query listener.
+ */
+interface CacheContinuousQueryListener<K, V> {
+    /**
+     * Query execution callback.
+     */
+    public void onExecution();
+
+    /**
+     * Entry update callback.
+     *
+     * @param evt Event
+     * @param primary Primary flag.
+     * @param recordIgniteEvt Whether to record event.
+     */
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+
+    /**
+     * Listener unregistered callback.
+     */
+    public void onUnregister();
+
+    /**
+     * @return Whether old value is required.
+     */
+    public boolean oldValueRequired();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
new file mode 100644
index 0000000..c2352c2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.security.*;
+import org.jdk8.backport.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.event.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static javax.cache.event.EventType.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+
+/**
+ * Continuous queries manager.
+ */
+public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
+    /** */
+    private static final byte CREATED_FLAG = 0b0001;
+
+    /** */
+    private static final byte UPDATED_FLAG = 0b0010;
+
+    /** */
+    private static final byte REMOVED_FLAG = 0b0100;
+
+    /** */
+    private static final byte EXPIRED_FLAG = 0b1000;
+
+    /** Listeners. */
+    private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrs = new ConcurrentHashMap8<>();
+
+    /** Listeners count. */
+    private final AtomicInteger lsnrCnt = new AtomicInteger();
+
+    /** Internal entries listeners. */
+    private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> intLsnrs = new ConcurrentHashMap8<>();
+
+    /** Internal listeners count. */
+    private final AtomicInteger intLsnrCnt = new AtomicInteger();
+
+    /** Query sequence number for message topic. */
+    private final AtomicLong seq = new AtomicLong();
+
+    /** JCache listeners. */
+    private final ConcurrentMap<CacheEntryListenerConfiguration, JCacheQuery> jCacheLsnrs =
+        new ConcurrentHashMap8<>();
+
+    /** Ordered topic prefix. */
+    private String topicPrefix;
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        // Append cache name to the topic.
+        topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        Iterable<CacheEntryListenerConfiguration<K, V>> cfgs = cctx.config().getCacheEntryListenerConfigurations();
+
+        if (cfgs != null) {
+            for (CacheEntryListenerConfiguration<K, V> cfg : cfgs)
+                executeJCacheQuery(cfg, true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        super.onKernalStop0(cancel);
+
+        for (JCacheQuery lsnr : jCacheLsnrs.values()) {
+            try {
+                lsnr.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to stop JCache entry listener: " + e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * @param e Cache entry.
+     * @param key Key.
+     * @param newVal New value.
+     * @param newBytes New value bytes.
+     * @param oldVal Old value.
+     * @param oldBytes Old value bytes.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes,
+        V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException {
+        assert e != null;
+        assert key != null;
+
+        ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol;
+
+        if (e.isInternal())
+            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
+        else
+            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
+
+        if (F.isEmpty(lsnrCol))
+            return;
+
+        boolean hasNewVal = newVal != null || (newBytes != null && !newBytes.isNull());
+        boolean hasOldVal = oldVal != null || (oldBytes != null && !oldBytes.isNull());
+
+        if (!hasNewVal && !hasOldVal)
+            return;
+
+        EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
+
+        boolean initialized = false;
+
+        boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
+        boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
+        for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
+            if (!initialized) {
+                if (lsnr.oldValueRequired()) {
+                    oldVal = cctx.unwrapTemporary(oldVal);
+
+                    if (oldVal == null && oldBytes != null && !oldBytes.isNull())
+                        oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : cctx.marshaller().<V>unmarshal(oldBytes.get
+                            (), cctx.deploy().globalLoader());
+                }
+
+                if (newVal == null && newBytes != null && !newBytes.isNull())
+                    newVal = newBytes.isPlain() ? (V)newBytes.get() : cctx.marshaller().<V>unmarshal(newBytes.get(),
+                        cctx.deploy().globalLoader());
+            }
+
+            CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes,
+                lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null);
+
+            CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
+                cctx.kernalContext().cache().jcache(cctx.name()), evtType, e0);
+
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+        }
+    }
+
+    /**
+     * @param e Entry.
+     * @param key Key.
+     * @param oldVal Old value.
+     * @param oldBytes Old value bytes.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, GridCacheValueBytes oldBytes)
+        throws IgniteCheckedException {
+        assert e != null;
+        assert key != null;
+
+        if (e.isInternal())
+            return;
+
+        ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = lsnrs;
+
+        if (F.isEmpty(lsnrCol))
+            return;
+
+        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) {
+            boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
+            boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
+            boolean initialized = false;
+
+            for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
+                if (!initialized) {
+                    if (lsnr.oldValueRequired()) {
+                        oldVal = cctx.unwrapTemporary(oldVal);
+
+                        if (oldVal == null && oldBytes != null && !oldBytes.isNull())
+                            oldVal = oldBytes.isPlain() ? (V)oldBytes.get() :
+                                cctx.marshaller().<V>unmarshal(oldBytes.get(), cctx.deploy().globalLoader());
+                    }
+                }
+
+                CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, null, null,
+                    lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null);
+
+                CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
+                    cctx.kernalContext().cache().jcache(cctx.name()), EXPIRED, e0);
+
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+            }
+        }
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param grp Cluster group.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public UUID executeQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter,
+        int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException {
+        return executeQuery0(
+            locLsnr,
+            rmtFilter,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            false,
+            true,
+            false,
+            true,
+            grp);
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param loc Local flag.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter,
+        boolean loc) throws IgniteCheckedException {
+        return executeQuery0(
+            locLsnr,
+            rmtFilter,
+            ContinuousQuery.DFLT_BUF_SIZE,
+            ContinuousQuery.DFLT_TIME_INTERVAL,
+            ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
+            true,
+            true,
+            false,
+            true,
+            loc ? cctx.grid().forLocal() : null);
+    }
+
+    public void cancelInternalQuery(UUID routineId) {
+        try {
+            cctx.kernalContext().continuous().stopRoutine(routineId).get();
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to stop internal continuous query: " + e.getMessage());
+        }
+    }
+
+    /**
+     * @param cfg Listener configuration.
+     * @param onStart Whether listener is created on node start.
+     * @throws IgniteCheckedException
+     */
+    public void executeJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart)
+        throws IgniteCheckedException {
+        JCacheQuery lsnr = new JCacheQuery(cfg, onStart);
+
+        JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr);
+
+        if (old != null)
+            throw new IllegalArgumentException("Listener is already registered for configuration: " + cfg);
+
+        try {
+            lsnr.execute();
+        }
+        catch (IgniteCheckedException e) {
+            cancelJCacheQuery(cfg);
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param cfg Listener configuration.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void cancelJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg) throws IgniteCheckedException {
+        JCacheQuery lsnr = jCacheLsnrs.remove(cfg);
+
+        if (lsnr != null)
+            lsnr.cancel();
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param internal Internal flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired event flag.
+     * @param grp Cluster group.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter,
+        int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean oldValRequired,
+        boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException {
+        cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+        if (grp == null)
+            grp = cctx.kernalContext().grid();
+
+        Collection<ClusterNode> nodes = grp.nodes();
+
+        if (nodes.isEmpty())
+            throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " +
+                "provided).");
+
+        boolean skipPrimaryCheck = false;
+
+        switch (cctx.config().getCacheMode()) {
+            case LOCAL:
+                if (!nodes.contains(cctx.localNode()))
+                    throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " +
+                        "only locally (provided projection contains remote nodes only).");
+                else if (nodes.size() > 1)
+                    U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " +
+                        "ignored).");
+
+                grp = grp.forNode(cctx.localNode());
+
+                break;
+
+            case REPLICATED:
+                if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode())) {
+                    CacheDistributionMode distributionMode = cctx.config().getDistributionMode();
+
+                    if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED)
+                        skipPrimaryCheck = true;
+                }
+
+                break;
+        }
+
+        int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
+            cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+        GridContinuousHandler hnd = new CacheContinuousQueryHandler<>(
+            cctx.name(),
+            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+            locLsnr,
+            rmtFilter,
+            internal,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            taskNameHash,
+            skipPrimaryCheck);
+
+        return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
+            autoUnsubscribe, grp.predicate()).get();
+    }
+
+    /**
+     * @param lsnrId Listener ID.
+     * @param lsnr Listener.
+     * @param internal Internal flag.
+     * @return Whether listener was actually registered.
+     */
+    boolean registerListener(UUID lsnrId,
+        CacheContinuousQueryListener<K, V> lsnr,
+        boolean internal) {
+        boolean added;
+
+        if (internal) {
+            added = intLsnrs.putIfAbsent(lsnrId, lsnr) == null;
+
+            if (added)
+                intLsnrCnt.incrementAndGet();
+        }
+        else {
+            added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+
+            if (added) {
+                lsnrCnt.incrementAndGet();
+
+                lsnr.onExecution();
+            }
+        }
+
+        return added;
+    }
+
+    /**
+     * @param internal Internal flag.
+     * @param id Listener ID.
+     */
+    void unregisterListener(boolean internal, UUID id) {
+        CacheContinuousQueryListener<K, V> lsnr;
+
+        if (internal) {
+            if ((lsnr = intLsnrs.remove(id)) != null) {
+                intLsnrCnt.decrementAndGet();
+
+                lsnr.onUnregister();
+            }
+        }
+        else {
+            if ((lsnr = lsnrs.remove(id)) != null) {
+                lsnrCnt.decrementAndGet();
+
+                lsnr.onUnregister();
+            }
+        }
+    }
+
+    /**
+     */
+    private class JCacheQuery {
+        /** */
+        private final CacheEntryListenerConfiguration<K, V> cfg;
+
+        /** */
+        private final boolean onStart;
+
+        /** */
+        private volatile UUID routineId;
+
+        /**
+         * @param cfg Listener configuration.
+         */
+        private JCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart) {
+            this.cfg = cfg;
+            this.onStart = onStart;
+        }
+
+        /**
+         * @throws IgniteCheckedException In case of error.
+         */
+        @SuppressWarnings("unchecked")
+        void execute() throws IgniteCheckedException {
+            if (!onStart)
+                cctx.config().addCacheEntryListenerConfiguration(cfg);
+
+            CacheEntryListener<? super K, ? super V> locLsnrImpl = cfg.getCacheEntryListenerFactory().create();
+
+            if (locLsnrImpl == null)
+                throw new IgniteCheckedException("Local CacheEntryListener is mandatory and can't be null.");
+
+            byte types = 0;
+
+            types |= locLsnrImpl instanceof CacheEntryCreatedListener ? CREATED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryUpdatedListener ? UPDATED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryRemovedListener ? REMOVED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryExpiredListener ? EXPIRED_FLAG : 0;
+
+            if (types == 0)
+                throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces.");
+
+            CacheEntryUpdatedListener<K, V> locLsnr = (CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener(
+                locLsnrImpl, cctx.kernalContext().cache().jcache(cctx.name()));
+
+            CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, V>)new JCacheQueryRemoteFilter<>(
+                cfg.getCacheEntryEventFilterFactory() != null ? cfg.getCacheEntryEventFilterFactory().create() : null,
+                types);
+
+            routineId = executeQuery0(
+                locLsnr,
+                rmtFilter,
+                ContinuousQuery.DFLT_BUF_SIZE,
+                ContinuousQuery.DFLT_TIME_INTERVAL,
+                ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
+                false,
+                cfg.isOldValueRequired(),
+                cfg.isSynchronous(),
+                false,
+                null);
+        }
+
+        /**
+         * @throws IgniteCheckedException In case of error.
+         */
+        @SuppressWarnings("unchecked")
+        void cancel() throws IgniteCheckedException {
+            UUID routineId0 = routineId;
+
+            assert routineId0 != null;
+
+            cctx.kernalContext().continuous().stopRoutine(routineId0).get();
+
+            cctx.config().removeCacheEntryListenerConfiguration(cfg);
+        }
+    }
+
+    /**
+     */
+    private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
+        /** */
+        private final CacheEntryListener<K, V> impl;
+
+        /** */
+        private final Cache<K, V> cache;
+
+        /**
+         * @param impl Listener.
+         */
+        JCacheQueryLocalListener(CacheEntryListener<K, V> impl, Cache<K, V> cache) {
+            assert impl != null;
+            assert cache != null;
+
+            this.impl = impl;
+            this.cache = cache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+            for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+                switch (evt.getEventType()) {
+                    case CREATED:
+                        assert impl instanceof CacheEntryCreatedListener;
+
+                        ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt));
+
+                        break;
+
+                    case UPDATED:
+                        assert impl instanceof CacheEntryUpdatedListener;
+
+                        ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt));
+
+                        break;
+
+                    case REMOVED:
+                        assert impl instanceof CacheEntryRemovedListener;
+
+                        ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt));
+
+                        break;
+
+                    case EXPIRED:
+                        assert impl instanceof CacheEntryExpiredListener;
+
+                        ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt));
+
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Unknown type: " + evt.getEventType());
+                }
+            }
+        }
+
+        /**
+         * @param evt Event.
+         * @return Singleton iterable.
+         */
+        @SuppressWarnings("unchecked")
+        private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton(
+            CacheEntryEvent<? extends K, ? extends V> evt) {
+            assert evt instanceof CacheContinuousQueryEvent;
+
+            Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1);
+
+            evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(),
+                ((CacheContinuousQueryEvent<? extends K, ? extends V>)evt).entry()));
+
+            return evts;
+        }
+    }
+
+    /**
+     */
+    private static class JCacheQueryRemoteFilter<K, V> implements CacheEntryEventFilter<K, V>, Externalizable {
+        /** */
+        private CacheEntryEventFilter<K, V> impl;
+
+        /** */
+        private byte types;
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public JCacheQueryRemoteFilter() {
+            // no-op.
+        }
+
+        /**
+         * @param impl Filter.
+         * @param types Types.
+         */
+        JCacheQueryRemoteFilter(CacheEntryEventFilter<K, V> impl, byte types) {
+            assert types != 0;
+
+            this.impl = impl;
+            this.types = types;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) {
+            return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(impl);
+            out.writeByte(types);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            impl = (CacheEntryEventFilter<K, V>)in.readObject();
+            types = in.readByte();
+        }
+
+        /**
+         * @param evtType Type.
+         * @return Flag value.
+         */
+        private byte flag(EventType evtType) {
+            switch (evtType) {
+                case CREATED:
+                    return CREATED_FLAG;
+
+                case UPDATED:
+                    return UPDATED_FLAG;
+
+                case REMOVED:
+                    return REMOVED_FLAG;
+
+                case EXPIRED:
+                    return EXPIRED_FLAG;
+
+                default:
+                    throw new IllegalStateException("Unknown type: " + evtType);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
deleted file mode 100644
index 528bde6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.security.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.event.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-
-/**
- * Continuous query implementation.
- */
-public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQuery<K, V> {
-    /** Guard. */
-    private final GridBusyLock guard = new GridBusyLock();
-
-    /** Close lock. */
-    private final Lock closeLock = new ReentrantLock();
-
-    /** Cache context. */
-    private final GridCacheContext<K, V> ctx;
-
-    /** Topic for ordered messages. */
-    private final Object topic;
-
-    /** Projection predicate */
-    private final IgnitePredicate<Cache.Entry<K, V>> prjPred;
-
-    /** Keep portable flag. */
-    private final boolean keepPortable;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Local callback. */
-    private volatile IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb;
-
-    /** Local callback. */
-    private volatile IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb;
-
-    /** Filter. */
-    private volatile IgniteBiPredicate<K, V> filter;
-
-    /** Remote filter. */
-    private volatile IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter;
-
-    /** Buffer size. */
-    private volatile int bufSize = DFLT_BUF_SIZE;
-
-    /** Time interval. */
-    @SuppressWarnings("RedundantFieldInitialization")
-    private volatile long timeInterval = DFLT_TIME_INTERVAL;
-
-    /** Automatic unsubscribe flag. */
-    private volatile boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
-
-    /** Continuous routine ID. */
-    private UUID routineId;
-
-    /**
-     * @param ctx Cache context.
-     * @param topic Topic for ordered messages.
-     * @param prjPred Projection predicate.
-     */
-    GridCacheContinuousQueryAdapter(GridCacheContext<K, V> ctx, Object topic,
-        @Nullable IgnitePredicate<Cache.Entry<K, V>> prjPred) {
-        assert ctx != null;
-        assert topic != null;
-
-        this.ctx = ctx;
-        this.topic = topic;
-        this.prjPred = prjPred;
-
-        keepPortable = ctx.keepPortable();
-
-        log = ctx.logger(getClass());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb) {
-        if (!guard.enterBusy())
-            throw new IllegalStateException("Continuous query can't be changed after it was executed.");
-
-        try {
-            this.locCb = locCb;
-        }
-        finally {
-            guard.leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback() {
-        return locCb;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter) {
-        if (!guard.enterBusy())
-            throw new IllegalStateException("Continuous query can't be changed after it was executed.");
-
-        try {
-            this.rmtFilter = rmtFilter;
-        }
-        finally {
-            guard.leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter() {
-        return rmtFilter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void bufferSize(int bufSize) {
-        A.ensure(bufSize > 0, "bufSize > 0");
-
-        if (!guard.enterBusy())
-            throw new IllegalStateException("Continuous query can't be changed after it was executed.");
-
-        try {
-            this.bufSize = bufSize;
-        }
-        finally {
-            guard.leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int bufferSize() {
-        return bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void timeInterval(long timeInterval) {
-        A.ensure(timeInterval >= 0, "timeInterval >= 0");
-
-        if (!guard.enterBusy())
-            throw new IllegalStateException("Continuous query can't be changed after it was executed.");
-
-        try {
-            this.timeInterval = timeInterval;
-        }
-        finally {
-            guard.leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long timeInterval() {
-        return timeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void autoUnsubscribe(boolean autoUnsubscribe) {
-        this.autoUnsubscribe = autoUnsubscribe;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isAutoUnsubscribe() {
-        return autoUnsubscribe;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void execute() throws IgniteCheckedException {
-        execute(null, false, false, false, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException {
-        execute(prj, false, false, false, true);
-    }
-
-    /**
-     * Starts continuous query execution.
-     *
-     * @param prj Grid projection.
-     * @param internal If {@code true} then query notified about internal entries updates.
-     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
-     * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}.
-     * @param oldVal {@code True} if old value is required.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void execute(@Nullable ClusterGroup prj,
-        boolean internal,
-        boolean entryLsnr,
-        boolean sync,
-        boolean oldVal) throws IgniteCheckedException {
-        if (locCb == null)
-            throw new IllegalStateException("Mandatory local callback is not set for the query: " + this);
-
-        ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
-        if (prj == null)
-            prj = ctx.grid();
-
-        prj = prj.forCacheNodes(ctx.name());
-
-        if (prj.nodes().isEmpty())
-            throw new ClusterTopologyCheckedException("Failed to continuous execute query (projection is empty): " +
-                this);
-
-        boolean skipPrimaryCheck = false;
-
-        Collection<ClusterNode> nodes = prj.nodes();
-
-        if (nodes.isEmpty())
-            throw new ClusterTopologyCheckedException("Failed to execute continuous query (empty projection is " +
-                "provided): " + this);
-
-        switch (ctx.config().getCacheMode()) {
-            case LOCAL:
-                if (!nodes.contains(ctx.localNode()))
-                    throw new ClusterTopologyCheckedException("Continuous query for LOCAL cache can be executed " +
-                        "only locally (provided projection contains remote nodes only): " + this);
-                else if (nodes.size() > 1)
-                    U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " +
-                        "ignored): " + this);
-
-                prj = prj.forNode(ctx.localNode());
-
-                break;
-
-            case REPLICATED:
-                if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) {
-                    CacheDistributionMode distributionMode = ctx.config().getDistributionMode();
-
-                    if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED)
-                        skipPrimaryCheck = true;
-                }
-
-                break;
-        }
-
-        closeLock.lock();
-
-        try {
-            if (routineId != null)
-                throw new IllegalStateException("Continuous query can't be executed twice.");
-
-            guard.block();
-
-            int taskNameHash =
-                ctx.kernalContext().security().enabled() ? ctx.kernalContext().job().currentTaskNameHash() : 0;
-
-            GridContinuousHandler hnd = new GridCacheContinuousQueryHandler<>(ctx.name(),
-                topic,
-                locCb,
-                rmtFilter,
-                prjPred,
-                internal,
-                entryLsnr,
-                sync,
-                oldVal,
-                skipPrimaryCheck,
-                taskNameHash,
-                keepPortable);
-
-            routineId = ctx.kernalContext().continuous().startRoutine(hnd,
-                bufSize,
-                timeInterval,
-                autoUnsubscribe,
-                prj.predicate()).get();
-        }
-        finally {
-            closeLock.unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        closeLock.lock();
-
-        try {
-            if (routineId != null)
-                ctx.kernalContext().continuous().stopRoutine(routineId).get();
-        }
-        finally {
-            closeLock.unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheContinuousQueryAdapter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
deleted file mode 100644
index fcfd3b8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.event.*;
-import java.io.*;
-
-import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*;
-
-/**
- * Entry implementation.
- */
-@SuppressWarnings("TypeParameterHidesVisibleType")
-public class GridCacheContinuousQueryEntry<K, V> implements Cache.Entry<K, V>, GridCacheDeployable, Externalizable,
-    CacheContinuousQueryEntry<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Event type enum values. */
-    private static final EventType[] EVT_TYPE_VALS = EventType.values();
-
-    /** Cache context. */
-    @SuppressWarnings("TransientFieldNotInitialized")
-    @GridToStringExclude
-    private final transient GridCacheContext<K, V> ctx;
-
-    /** Cache entry. */
-    @SuppressWarnings("TransientFieldNotInitialized")
-    @GridToStringExclude
-    private final transient Cache.Entry<K, V> impl;
-
-    /** Key. */
-    @GridToStringInclude
-    private K key;
-
-    /** New value. */
-    @GridToStringInclude
-    private V newVal;
-
-    /** Old value. */
-    @GridToStringInclude
-    private V oldVal;
-
-    /** Serialized key. */
-    private byte[] keyBytes;
-
-    /** Serialized value. */
-    @GridToStringExclude
-    private GridCacheValueBytes newValBytes;
-
-    /** Serialized value. */
-    @GridToStringExclude
-    private GridCacheValueBytes oldValBytes;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** Deployment info. */
-    @GridToStringExclude
-    private GridDeploymentInfo depInfo;
-
-    /** */
-    private EventType evtType;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheContinuousQueryEntry() {
-        ctx = null;
-        impl = null;
-    }
-
-    /**
-     * @param ctx Cache context.
-     * @param impl Cache entry.
-     * @param key Key.
-     * @param newVal Value.
-     * @param newValBytes Value bytes.
-     * @param oldVal Old value.
-     * @param oldValBytes Old value bytes.
-     * @param evtType Event type.
-     */
-    GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx,
-        Cache.Entry<K, V> impl,
-        K key,
-        @Nullable V newVal,
-        @Nullable GridCacheValueBytes newValBytes,
-        @Nullable V oldVal,
-        @Nullable GridCacheValueBytes oldValBytes,
-        EventType evtType) {
-        assert ctx != null;
-        assert impl != null;
-        assert key != null;
-        assert evtType != null;
-
-        this.ctx = ctx;
-        this.impl = impl;
-        this.key = key;
-        this.newVal = newVal;
-        this.newValBytes = newValBytes;
-        this.oldVal = oldVal;
-        this.oldValBytes = oldValBytes;
-        this.evtType = evtType;
-    }
-
-    /**
-     * @return Cache entry.
-     */
-    Cache.Entry<K, V> entry() {
-        return impl;
-    }
-
-    /**
-     * @return Cache context.
-     */
-    GridCacheContext<K, V> context() {
-        return ctx;
-    }
-
-    /**
-     * @return New value bytes.
-     */
-    GridCacheValueBytes newValueBytes() {
-        return newValBytes;
-    }
-
-    /**
-     * @return {@code True} if old value is set.
-     */
-    boolean hasOldValue() {
-        return oldVal != null || (oldValBytes != null && !oldValBytes.isNull());
-    }
-
-    /**
-     * @return {@code True} if entry expired.
-     */
-    public EventType eventType() {
-        return evtType;
-    }
-
-    /**
-     * Unmarshals value from bytes if needed.
-     *
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     * @throws IgniteCheckedException In case of error.
-     */
-    void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-        assert marsh != null;
-
-        if (newVal == null && newValBytes != null && !newValBytes.isNull())
-            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
-    }
-
-    /**
-     * @return Cache name.
-     */
-    String cacheName() {
-        return cacheName;
-    }
-
-    /**
-     * @param cacheName New cache name.
-     */
-    void cacheName(String cacheName) {
-        this.cacheName = cacheName;
-    }
-
-    /**
-     * @param marsh Marshaller.
-     * @throws IgniteCheckedException In case of error.
-     */
-    void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
-        assert marsh != null;
-
-        assert key != null;
-
-        keyBytes = marsh.marshal(key);
-
-        if (newValBytes == null || newValBytes.isNull())
-            newValBytes = newVal != null ?
-                newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null;
-
-        if (oldValBytes == null || oldValBytes.isNull())
-            oldValBytes = oldVal != null ?
-                oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null;
-    }
-
-    /**
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     * @throws IgniteCheckedException In case of error.
-     */
-    void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-        assert marsh != null;
-
-        assert key == null : "Key should be null: " + key;
-        assert newVal == null : "New value should be null: " + newVal;
-        assert oldVal == null : "Old value should be null: " + oldVal;
-        assert keyBytes != null;
-
-        key = marsh.unmarshal(keyBytes, ldr);
-
-        if (newValBytes != null && !newValBytes.isNull())
-            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
-
-        if (oldValBytes != null && !oldValBytes.isNull())
-            oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public K getKey() {
-        return key;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getValue() {
-        return newVal;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getOldValue() {
-        return oldVal;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V setValue(V val) {
-        ctx.denyOnFlag(READ);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepare(GridDeploymentInfo depInfo) {
-        this.depInfo = depInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDeploymentInfo deployInfo() {
-        return depInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T unwrap(Class<T> clazz) {
-        if(clazz.isAssignableFrom(getClass()))
-            return clazz.cast(this);
-
-        throw new IllegalArgumentException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        boolean b = keyBytes != null;
-
-        out.writeBoolean(b);
-
-        if (b) {
-            U.writeByteArray(out, keyBytes);
-
-            if (newValBytes != null && !newValBytes.isNull()) {
-                out.writeBoolean(true);
-                out.writeBoolean(newValBytes.isPlain());
-                U.writeByteArray(out, newValBytes.get());
-            }
-            else
-                out.writeBoolean(false);
-
-            if (oldValBytes != null && !oldValBytes.isNull()) {
-                out.writeBoolean(true);
-                out.writeBoolean(oldValBytes.isPlain());
-                U.writeByteArray(out, oldValBytes.get());
-            }
-            else
-                out.writeBoolean(false);
-
-            U.writeString(out, cacheName);
-            out.writeObject(depInfo);
-        }
-        else {
-            out.writeObject(key);
-            out.writeObject(newVal);
-            out.writeObject(oldVal);
-        }
-
-        out.writeByte((byte)evtType.ordinal());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        boolean b = in.readBoolean();
-
-        if (b) {
-            keyBytes = U.readByteArray(in);
-
-            if (in.readBoolean())
-                newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
-
-            if (in.readBoolean())
-                oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
-
-            cacheName = U.readString(in);
-            depInfo = (GridDeploymentInfo)in.readObject();
-        }
-        else {
-            key = (K)in.readObject();
-            newVal = (V)in.readObject();
-            oldVal = (V)in.readObject();
-        }
-
-        evtType = EVT_TYPE_VALS[in.readByte()];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheContinuousQueryEntry.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
deleted file mode 100644
index 7b0615d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.lang.*;
-
-/**
- * Extended continuous query filter.
- */
-public interface GridCacheContinuousQueryFilterEx<K, V> extends
-        IgnitePredicate<CacheContinuousQueryEntry<K, V>> {
-    /**
-     * Callback for query unregister event.
-     */
-    public void onQueryUnregister();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
deleted file mode 100644
index d10c09f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.event.*;
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Continuous query handler.
- */
-class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** Topic for ordered messages. */
-    private Object topic;
-
-    /** Local callback. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb;
-
-    /** Filter. */
-    private IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter;
-
-    /** Projection predicate */
-    private IgnitePredicate<Cache.Entry<K, V>> prjPred;
-
-    /** Deployable object for filter. */
-    private DeployableObject filterDep;
-
-    /** Deployable object for Projection predicate. */
-    private DeployableObject prjPredDep;
-
-    /** Internal flag. */
-    private boolean internal;
-
-    /** Entry listener flag. */
-    private boolean entryLsnr;
-
-    /** Synchronous listener flag. */
-    private boolean sync;
-
-    /** {@code True} if old value is required. */
-    private boolean oldVal;
-
-    /** Task name hash code. */
-    private int taskHash;
-
-    /** Keep portable flag. */
-    private boolean keepPortable;
-
-    /** Whether to skip primary check for REPLICATED cache. */
-    private transient boolean skipPrimaryCheck;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheContinuousQueryHandler() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cacheName Cache name.
-     * @param topic Topic for ordered messages.
-     * @param cb Local callback.
-     * @param filter Filter.
-     * @param prjPred Projection predicate.
-     * @param internal If {@code true} then query is notified about internal entries updates.
-     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
-     * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}.
-     * @param oldVal {@code True} if old value is required.
-     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
-     * @param taskHash Task name hash code.
-     */
-    GridCacheContinuousQueryHandler(@Nullable String cacheName,
-        Object topic,
-        IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb,
-        @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter,
-        @Nullable IgnitePredicate<Cache.Entry<K, V>> prjPred,
-        boolean internal,
-        boolean entryLsnr,
-        boolean sync,
-        boolean oldVal,
-        boolean skipPrimaryCheck,
-        int taskHash,
-        boolean keepPortable) {
-        assert topic != null;
-        assert cb != null;
-        assert !sync || entryLsnr;
-
-        this.cacheName = cacheName;
-        this.topic = topic;
-        this.cb = cb;
-        this.filter = filter;
-        this.prjPred = prjPred;
-        this.internal = internal;
-        this.entryLsnr = entryLsnr;
-        this.sync = sync;
-        this.oldVal = oldVal;
-        this.taskHash = taskHash;
-        this.keepPortable = keepPortable;
-        this.skipPrimaryCheck = skipPrimaryCheck;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isForEvents() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isForMessaging() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isForQuery() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
-        throws IgniteCheckedException {
-        assert nodeId != null;
-        assert routineId != null;
-        assert ctx != null;
-
-        if (cb != null)
-            ctx.resource().injectGeneric(cb);
-
-        if (filter != null)
-            ctx.resource().injectGeneric(filter);
-
-        final boolean loc = nodeId.equals(ctx.localNodeId());
-
-        GridCacheContinuousQueryListener<K, V> lsnr = new GridCacheContinuousQueryListener<K, V>() {
-            @Override public void onExecution() {
-                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                    ctx.event().record(new CacheQueryExecutedEvent<>(
-                        ctx.discovery().localNode(),
-                        "Continuous query executed.",
-                        EVT_CACHE_QUERY_EXECUTED,
-                        CacheQueryType.CONTINUOUS,
-                        cacheName,
-                        null,
-                        null,
-                        null,
-                        filter,
-                        null,
-                        nodeId,
-                        taskName()
-                    ));
-                }
-            }
-
-            @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                if (cctx.isReplicated() &&
-                    !skipPrimaryCheck &&
-                    !cctx.affinity().primary(cctx.localNode(), e.getKey(), cctx.topology().topologyVersion()))
-                    return;
-
-                boolean notify;
-
-                CacheFlag[] f = cctx.forceLocalRead();
-
-                try {
-                    notify = (prjPred == null || checkProjection(e)) &&
-                        (filter == null || filter.apply(e));
-                }
-                finally {
-                    cctx.forceFlags(f);
-                }
-
-                if (notify) {
-                    if (!oldVal && e.hasOldValue()) {
-                        e = new GridCacheContinuousQueryEntry<>(e.context(),
-                            e.entry(),
-                            e.getKey(),
-                            e.getValue(),
-                            e.newValueBytes(),
-                            null,
-                            null,
-                            e.eventType());
-                    }
-
-                    if (loc) {
-                        if (!cb.apply(nodeId,
-                            F.<CacheContinuousQueryEntry<K, V>>asList(e)))
-                            ctx.continuous().stopRoutine(routineId);
-                    }
-                    else {
-                        try {
-                            ClusterNode node = ctx.discovery().node(nodeId);
-
-                            if (ctx.config().isPeerClassLoadingEnabled() && node != null &&
-                                U.hasCache(node, cacheName)) {
-                                e.p2pMarshal(ctx.config().getMarshaller());
-
-                                e.cacheName(cacheName);
-
-                                GridCacheDeploymentManager depMgr =
-                                    ctx.cache().internalCache(cacheName).context().deploy();
-
-                                depMgr.prepare(e);
-                            }
-
-                            ctx.continuous().addNotification(nodeId, routineId, e, topic, sync);
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                        }
-                    }
-
-                    if (!entryLsnr && recordEvt) {
-                        ctx.event().record(new CacheQueryReadEvent<>(
-                            ctx.discovery().localNode(),
-                            "Continuous query executed.",
-                            EVT_CACHE_QUERY_OBJECT_READ,
-                            CacheQueryType.CONTINUOUS,
-                            cacheName,
-                            null,
-                            null,
-                            null,
-                            filter,
-                            null,
-                            nodeId,
-                            taskName(),
-                            e.getKey(),
-                            e.getValue(),
-                            e.getOldValue(),
-                            null
-                        ));
-                    }
-                }
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onUnregister() {
-                if (filter != null && filter instanceof GridCacheContinuousQueryFilterEx)
-                    ((GridCacheContinuousQueryFilterEx)filter).onQueryUnregister();
-            }
-
-            private boolean checkProjection(GridCacheContinuousQueryEntry<K, V> e) {
-                GridCacheProjectionImpl.FullFilter<K, V> filter = (GridCacheProjectionImpl.FullFilter<K, V>)prjPred;
-
-                GridCacheProjectionImpl.KeyValueFilter<K, V> kvFilter = filter.keyValueFilter();
-                IgnitePredicate<? super Cache.Entry<K, V>> entryFilter = filter.entryFilter();
-
-                boolean ret = true;
-
-                if (kvFilter != null) {
-                    V v = e.getValue() == null ? e.getOldValue() : e.getValue();
-
-                    ret = v != null && kvFilter.apply(e.getKey(), v);
-                }
-
-                if (entryFilter != null)
-                    ret = ret && entryFilter.apply(e);
-
-                return ret;
-            }
-
-            @Nullable private String taskName() {
-                return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
-            }
-        };
-
-        return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
-        if (!entryLsnr)
-            manager(ctx).iterate(internal, routineId, keepPortable);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void unregister(UUID routineId, GridKernalContext ctx) {
-        assert routineId != null;
-        assert ctx != null;
-
-        manager(ctx).unregisterListener(internal, routineId);
-    }
-
-    /**
-     * @param ctx Kernal context.
-     * @return Continuous query manager.
-     */
-    private GridCacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) {
-        return cacheContext(ctx).continuousQueries();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
-        assert nodeId != null;
-        assert routineId != null;
-        assert objs != null;
-        assert ctx != null;
-
-        Collection<CacheContinuousQueryEntry<K, V>> entries =
-            (Collection<CacheContinuousQueryEntry<K, V>>)objs;
-
-        if (ctx.config().isPeerClassLoadingEnabled()) {
-            for (Map.Entry<K, V> e : entries) {
-                assert e instanceof GridCacheContinuousQueryEntry;
-
-                GridCacheContinuousQueryEntry<K, V> qe = (GridCacheContinuousQueryEntry<K, V>)e;
-
-                GridCacheAdapter cache = ctx.cache().internalCache(qe.cacheName());
-
-                ClassLoader ldr = null;
-
-                if (cache != null) {
-                    GridCacheDeploymentManager depMgr = cache.context().deploy();
-
-                    GridDeploymentInfo depInfo = qe.deployInfo();
-
-                    if (depInfo != null) {
-                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
-                            depInfo.participants(), depInfo.localDeploymentOwner());
-                    }
-
-                    ldr = depMgr.globalLoader();
-                }
-                else {
-                    U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
-                        "when peer class loading is enabled: " + qe.cacheName() + ". Will try to unmarshal " +
-                        "with default class loader.");
-                }
-
-                try {
-                    qe.p2pUnmarshal(ctx.config().getMarshaller(), ldr);
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
-                }
-            }
-        }
-
-        if (!cb.apply(nodeId, entries))
-            ctx.continuous().stopRoutine(routineId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
-        assert ctx != null;
-        assert ctx.config().isPeerClassLoadingEnabled();
-
-        if (filter != null && !U.isGrid(filter.getClass()))
-            filterDep = new DeployableObject(filter, ctx);
-
-        if (prjPred != null && !U.isGrid(prjPred.getClass()))
-            prjPredDep = new DeployableObject(prjPred, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
-        assert nodeId != null;
-        assert ctx != null;
-        assert ctx.config().isPeerClassLoadingEnabled();
-
-        if (filterDep != null)
-            filter = filterDep.unmarshal(nodeId, ctx);
-
-        if (prjPredDep != null)
-            prjPred = prjPredDep.unmarshal(nodeId, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Object orderedTopic() {
-        return topic;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, cacheName);
-        out.writeObject(topic);
-
-        boolean b = filterDep != null;
-
-        out.writeBoolean(b);
-
-        if (b)
-            out.writeObject(filterDep);
-        else
-            out.writeObject(filter);
-
-        b = prjPredDep != null;
-
-        out.writeBoolean(b);
-
-        if (b)
-            out.writeObject(prjPredDep);
-        else
-            out.writeObject(prjPred);
-
-        out.writeBoolean(internal);
-
-        out.writeBoolean(entryLsnr);
-
-        out.writeBoolean(sync);
-
-        out.writeBoolean(oldVal);
-
-        out.writeInt(taskHash);
-
-        out.writeBoolean(keepPortable);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        cacheName = U.readString(in);
-        topic = in.readObject();
-
-        boolean b = in.readBoolean();
-
-        if (b)
-            filterDep = (DeployableObject)in.readObject();
-        else
-            filter = (IgnitePredicate<CacheContinuousQueryEntry<K,V>>)in.readObject();
-
-        b = in.readBoolean();
-
-        if (b)
-            prjPredDep = (DeployableObject)in.readObject();
-        else
-            prjPred = (IgnitePredicate<Cache.Entry<K, V>>)in.readObject();
-
-        internal = in.readBoolean();
-
-        entryLsnr = in.readBoolean();
-
-        sync = in.readBoolean();
-
-        oldVal = in.readBoolean();
-
-        taskHash = in.readInt();
-
-        keepPortable = in.readBoolean();
-    }
-
-    /**
-     * @param ctx Kernal context.
-     * @return Cache context.
-     */
-    private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
-        assert ctx != null;
-
-        return ctx.cache().<K, V>internalCache(cacheName).context();
-    }
-
-    /**
-     * Deployable object.
-     */
-    private static class DeployableObject implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Serialized object. */
-        private byte[] bytes;
-
-        /** Deployment class name. */
-        private String clsName;
-
-        /** Deployment info. */
-        private GridDeploymentInfo depInfo;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public DeployableObject() {
-            // No-op.
-        }
-
-        /**
-         * @param obj Object.
-         * @param ctx Kernal context.
-         * @throws IgniteCheckedException In case of error.
-         */
-        private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
-            assert obj != null;
-            assert ctx != null;
-
-            Class cls = U.detectClass(obj);
-
-            clsName = cls.getName();
-
-            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
-
-            depInfo = new GridDeploymentInfoBean(dep);
-
-            bytes = ctx.config().getMarshaller().marshal(obj);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param ctx Kernal context.
-         * @return Deserialized object.
-         * @throws IgniteCheckedException In case of error.
-         */
-        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
-            assert ctx != null;
-
-            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
-
-            return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeByteArray(out, bytes);
-            U.writeString(out, clsName);
-            out.writeObject(depInfo);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            bytes = U.readByteArray(in);
-            clsName = U.readString(in);
-            depInfo = (GridDeploymentInfo)in.readObject();
-        }
-    }
-}


Mime
View raw message