ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [04/14] incubator-ignite git commit: IGNITE-143 - Continuous queries refactoring
Date Thu, 12 Feb 2015 21:50:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
deleted file mode 100644
index 3207f0a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.CacheEntryEvent;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.configuration.*;
-import javax.cache.event.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static javax.cache.event.EventType.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-
-/**
- * Continuous queries manager.
- */
-public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
-    /** Ordered topic prefix. */
-    private String topicPrefix;
-
-    /** Listeners. */
-    private final ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrs = new ConcurrentHashMap8<>();
-
-    /** Listeners count. */
-    private final AtomicInteger lsnrCnt = new AtomicInteger();
-
-    /** Internal entries listeners. */
-    private final ConcurrentMap<UUID, ListenerInfo<K, V>> intLsnrs = new ConcurrentHashMap8<>();
-
-    /** Internal listeners count. */
-    private final AtomicInteger intLsnrCnt = new AtomicInteger();
-
-    /** Query sequence number for message topic. */
-    private final AtomicLong seq = new AtomicLong();
-
-    /** Continues queries created for cache event listeners. */
-    private final ConcurrentMap<CacheEntryListenerConfiguration, CacheContinuousQuery<K, V>> lsnrQrys =
-        new ConcurrentHashMap8<>();
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        // Append cache name to the topic.
-        topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations();
-
-        if (lsnrCfgs != null) {
-            for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs)
-                registerCacheEntryListener(cfg, false);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStop0(boolean cancel) {
-        super.onKernalStop0(cancel);
-
-        for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) {
-            try {
-                deregisterCacheEntryListener(lsnrCfg);
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to remove cache entry listener: " + e);
-            }
-        }
-    }
-
-    /**
-     * @param prjPred Projection predicate.
-     * @return New continuous query.
-     */
-    public CacheContinuousQuery<K, V> createQuery(@Nullable IgnitePredicate<CacheEntry<K, V>> prjPred) {
-        Object topic = TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement());
-
-        return new GridCacheContinuousQueryAdapter<>(cctx, topic, prjPred);
-    }
-
-    /**
-     * @param e Cache entry.
-     * @param key Key.
-     * @param newVal New value.
-     * @param newBytes New value bytes.
-     * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
-     * @param preload {@code True} if entry is updated during preloading.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void onEntryUpdate(GridCacheEntryEx<K, V> e,
-        K key,
-        @Nullable V newVal,
-        @Nullable GridCacheValueBytes newBytes,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes,
-        boolean preload) throws IgniteCheckedException {
-        assert e != null;
-        assert key != null;
-
-        ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol;
-
-        if (e.isInternal())
-            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
-        else
-            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
-        if (F.isEmpty(lsnrCol))
-            return;
-
-        oldVal = cctx.unwrapTemporary(oldVal);
-
-        EventType evtType = newVal == null ? REMOVED :
-            ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED));
-
-        GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>(
-            cctx,
-            e.wrap(false),
-            key,
-            newVal,
-            newBytes,
-            oldVal,
-            oldBytes,
-            evtType);
-
-        e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
-
-        boolean recordEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-        for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
-            if (preload && lsnr.entryListener())
-                continue;
-
-            lsnr.onEntryUpdate(e0, recordEvt);
-        }
-    }
-
-    /**
-     * @param e Entry.
-     * @param key Key.
-     * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
-     */
-    public void onEntryExpired(GridCacheEntryEx<K, V> e,
-        K key,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes) {
-        if (e.isInternal())
-            return;
-
-        ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs;
-
-        if (F.isEmpty(lsnrCol))
-            return;
-
-        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) {
-            GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>(
-                cctx,
-                e.wrap(false),
-                key,
-                null,
-                null,
-                oldVal,
-                oldBytes,
-                EXPIRED);
-
-            for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
-                if (!lsnr.entryListener())
-                    continue;
-
-                lsnr.onEntryUpdate(e0, false);
-            }
-        }
-    }
-
-    /**
-     * @param lsnrCfg Listener configuration.
-     * @param addToCfg If {@code true} adds listener configuration to cache configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg, boolean addToCfg)
-        throws IgniteCheckedException {
-        GridCacheContinuousQueryAdapter<K, V> qry = null;
-
-        try {
-            A.notNull(lsnrCfg, "lsnrCfg");
-
-            Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory();
-
-            A.notNull(factory, "cacheEntryListenerFactory");
-
-            CacheEntryListener lsnr = factory.create();
-
-            A.notNull(lsnr, "lsnr");
-
-            IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name());
-
-            EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr);
-
-            if (!(cb.create() || cb.update() || cb.remove() || cb.expire()))
-                throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces.");
-
-            qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery();
-
-            CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry);
-
-            if (old != null)
-                throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg);
-
-            qry.autoUnsubscribe(true);
-
-            qry.bufferSize(1);
-
-            qry.localCallback(cb);
-
-            EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(),
-                cb.update(),
-                cb.remove(),
-                cb.expire(),
-                lsnrCfg.getCacheEntryEventFilterFactory(),
-                cctx.kernalContext().grid(),
-                cctx.name());
-
-            qry.remoteFilter(fltr);
-
-            qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired());
-
-            if (addToCfg)
-                cctx.config().addCacheEntryListenerConfiguration(lsnrCfg);
-        }
-        catch (IgniteCheckedException e) {
-            lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it.
-
-            throw e;
-        }
-    }
-
-    /**
-     * @param lsnrCfg Listener configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException {
-        A.notNull(lsnrCfg, "lsnrCfg");
-
-        CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg);
-
-        if (qry != null) {
-            cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg);
-
-            qry.close();
-        }
-    }
-
-    /**
-     * @param lsnrId Listener ID.
-     * @param lsnr Listener.
-     * @param internal Internal flag.
-     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
-     * @return Whether listener was actually registered.
-     */
-    @SuppressWarnings("UnusedParameters")
-    boolean registerListener(UUID lsnrId,
-        GridCacheContinuousQueryListener<K, V> lsnr,
-        boolean internal,
-        boolean entryLsnr) {
-        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr);
-
-        boolean added;
-
-        if (internal) {
-            added = intLsnrs.putIfAbsent(lsnrId, info) == null;
-
-            if (added)
-                intLsnrCnt.incrementAndGet();
-        }
-        else {
-            added = lsnrs.putIfAbsent(lsnrId, info) == null;
-
-            if (added) {
-                lsnrCnt.incrementAndGet();
-
-                lsnr.onExecution();
-            }
-        }
-
-        return added;
-    }
-
-    /**
-     * @param internal Internal flag.
-     * @param id Listener ID.
-     */
-    void unregisterListener(boolean internal, UUID id) {
-        ListenerInfo info;
-
-        if (internal) {
-            if ((info = intLsnrs.remove(id)) != null) {
-                intLsnrCnt.decrementAndGet();
-
-                info.lsnr.onUnregister();
-            }
-        }
-        else {
-            if ((info = lsnrs.remove(id)) != null) {
-                lsnrCnt.decrementAndGet();
-
-                info.lsnr.onUnregister();
-            }
-        }
-    }
-
-    /**
-     * Iterates through existing data.
-     *
-     * @param internal Internal flag.
-     * @param id Listener ID.
-     * @param keepPortable Keep portable flag.
-     */
-    @SuppressWarnings("unchecked")
-    void iterate(boolean internal, UUID id, boolean keepPortable) {
-        ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id);
-
-        assert info != null;
-
-        GridCacheProjectionImpl<K, V> oldPrj = null;
-
-        try {
-            if (keepPortable) {
-                oldPrj = cctx.projectionPerCall();
-
-                cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0());
-            }
-
-            Set<CacheEntry<K, V>> entries;
-
-            if (cctx.isReplicated())
-                entries = internal ? cctx.cache().entrySetx() :
-                    cctx.cache().entrySet();
-            else
-                entries = internal ? cctx.cache().primaryEntrySetx() :
-                    cctx.cache().primaryEntrySet();
-
-            boolean evt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-            for (CacheEntry<K, V> e : entries) {
-                GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx,
-                    e,
-                    e.getKey(),
-                    e.getValue(),
-                    null,
-                    null,
-                    null,
-                    CREATED);
-
-                info.onIterate(qryEntry, evt);
-            }
-
-            info.flushPending();
-        }
-        finally {
-            if (keepPortable)
-                cctx.projectionPerCall(oldPrj);
-        }
-    }
-
-    /**
-     * Listener info.
-     */
-    private static class ListenerInfo<K, V> {
-        /** Listener. */
-        private final GridCacheContinuousQueryListener<K, V> lsnr;
-
-        /** Pending entries. */
-        private Collection<PendingEntry<K, V>> pending;
-
-        /** */
-        private final boolean entryLsnr;
-
-        /**
-         * @param lsnr Listener.
-         * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}.
-         */
-        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) {
-            this.lsnr = lsnr;
-            this.entryLsnr = entryLsnr;
-
-            if (!entryLsnr)
-                pending = new LinkedList<>();
-        }
-
-        /**
-         * @param e Entry update callback.
-         * @param recordEvt Whether to record event.
-         */
-        void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
-            boolean notifyLsnr = true;
-
-            synchronized (this) {
-                if (pending != null) {
-                    pending.add(new PendingEntry<>(e, recordEvt));
-
-                    notifyLsnr = false;
-                }
-            }
-
-            if (notifyLsnr)
-                lsnr.onEntryUpdate(e, recordEvt);
-        }
-
-        /**
-         * @param e Entry iteration callback.
-         * @param recordEvt Whether to record event.
-         */
-        void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
-            lsnr.onEntryUpdate(e, recordEvt);
-        }
-
-        /**
-         * Flushes pending entries to listener.
-         */
-        void flushPending() {
-            Collection<PendingEntry<K, V>> pending0;
-
-            synchronized (this) {
-                pending0 = pending;
-
-                pending = null;
-            }
-
-            for (PendingEntry<K, V> e : pending0)
-                lsnr.onEntryUpdate(e.entry, e.recordEvt);
-        }
-
-        /**
-         * @return {@code True} if listener created for {@link CacheEntryListener}.
-         */
-        boolean entryListener() {
-            return entryLsnr;
-        }
-    }
-
-    /**
-     * Pending entry.
-     */
-    private static class PendingEntry<K, V> {
-        /** Entry. */
-        private final GridCacheContinuousQueryEntry<K, V> entry;
-
-        /** Whether to record event. */
-        private final boolean recordEvt;
-
-        /**
-         * @param entry Entry.
-         * @param recordEvt Whether to record event.
-         */
-        private PendingEntry(GridCacheContinuousQueryEntry<K, V> entry, boolean recordEvt) {
-            this.entry = entry;
-            this.recordEvt = recordEvt;
-        }
-    }
-
-    /**
-     *
-     */
-    static class EntryListenerFilter<K1, V1> implements
-        IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private boolean create;
-
-        /** */
-        private boolean update;
-
-        /** */
-        private boolean rmv;
-
-        /** */
-        private boolean expire;
-
-        /** */
-        private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory;
-
-        /** */
-        private CacheEntryEventFilter fltr;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        private IgniteCache cache;
-
-        /** */
-        private String cacheName;
-
-        /**
-         *
-         */
-        public EntryListenerFilter() {
-            // No-op.
-        }
-
-        /**
-         * @param create {@code True} if listens for create events.
-         * @param update {@code True} if listens for create events.
-         * @param rmv {@code True} if listens for remove events.
-         * @param expire {@code True} if listens for expire events.
-         * @param fltrFactory Filter factory.
-         * @param ignite Ignite instance.
-         * @param cacheName Cache name.
-         */
-        EntryListenerFilter(
-            boolean create,
-            boolean update,
-            boolean rmv,
-            boolean expire,
-            Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory,
-            Ignite ignite,
-            @Nullable String cacheName) {
-            this.create = create;
-            this.update = update;
-            this.rmv = rmv;
-            this.expire = expire;
-            this.fltrFactory = fltrFactory;
-            this.ignite = ignite;
-            this.cacheName = cacheName;
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-
-            cache = ignite.jcache(cacheName);
-
-            assert cache != null : cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> entry) {
-            try {
-                EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType());
-
-                switch (evtType) {
-                    case EXPIRED:
-                        if (!expire)
-                            return false;
-
-                        break;
-
-                    case REMOVED:
-                        if (!rmv)
-                            return false;
-
-                        break;
-
-                    case CREATED:
-                        if (!create)
-                            return false;
-
-                        break;
-
-                    case UPDATED:
-                        if (!update)
-                            return false;
-
-                        break;
-
-                    default:
-                        assert false : evtType;
-                }
-
-                if (fltr == null)
-                    return true;
-
-                if (cache == null) {
-                    cache = ignite.jcache(cacheName);
-
-                    assert cache != null : cacheName;
-                }
-
-                return fltr.evaluate(new CacheEntryEvent(cache, evtType, entry));
-            }
-            catch (Exception e) {
-                LT.warn(ignite.log(), e, "Cache entry event filter error: " + e);
-
-                return true;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeBoolean(create);
-
-            out.writeBoolean(update);
-
-            out.writeBoolean(rmv);
-
-            out.writeBoolean(expire);
-
-            U.writeString(out, cacheName);
-
-            out.writeObject(fltrFactory);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            create = in.readBoolean();
-
-            update = in.readBoolean();
-
-            rmv = in.readBoolean();
-
-            expire = in.readBoolean();
-
-            cacheName = U.readString(in);
-
-            fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject();
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-        }
-    }
-
-    /**
-     *
-     */
-    private class EntryListenerCallback implements
-        IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> {
-        /** */
-        private final IgniteCacheProxy<K, V> cache;
-
-        /** */
-        private final CacheEntryCreatedListener createLsnr;
-
-        /** */
-        private final CacheEntryUpdatedListener updateLsnr;
-
-        /** */
-        private final CacheEntryRemovedListener rmvLsnr;
-
-        /** */
-        private final CacheEntryExpiredListener expireLsnr;
-
-        /**
-         * @param cache Cache to be used as event source.
-         * @param lsnr Listener.
-         */
-        EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) {
-            this.cache = cache;
-
-            createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null;
-            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null;
-            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null;
-            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null;
-        }
-
-        /**
-         * @return {@code True} if listens for create event.
-         */
-        boolean create() {
-            return createLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for update event.
-         */
-        boolean update() {
-            return updateLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for remove event.
-         */
-        boolean remove() {
-            return rmvLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for expire event.
-         */
-        boolean expire() {
-            return expireLsnr != null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(UUID uuid,
-            Collection<CacheContinuousQueryEntry<K, V>> entries) {
-            for (CacheContinuousQueryEntry entry : entries) {
-                try {
-                    EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType());
-
-                    switch (evtType) {
-                        case EXPIRED: {
-                            assert expireLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, EXPIRED, entry);
-
-                            expireLsnr.onExpired(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case REMOVED: {
-                            assert rmvLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, REMOVED, entry);
-
-                            rmvLsnr.onRemoved(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case UPDATED: {
-                            assert updateLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, UPDATED, entry);
-
-                            updateLsnr.onUpdated(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case CREATED: {
-                            assert createLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, CREATED, entry);
-
-                            createLsnr.onCreated(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        default:
-                            assert false : evtType;
-                    }
-                }
-                catch (CacheEntryListenerException e) {
-                    LT.warn(log, e, "Cache entry listener error: " + e);
-                }
-            }
-
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index a6f76f8..10f5d36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -36,12 +35,15 @@ import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.services.*;
 import org.apache.ignite.marshaller.*;
+import org.apache.ignite.services.*;
 import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -88,10 +90,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     private GridLocalEventListener topLsnr = new TopologyListener();
 
     /** Deployment listener. */
-    private GridCacheContinuousQueryAdapter<Object, Object> cfgQry;
+    private QueryCursor<Cache.Entry<Object, Object>> cfgQryCur;
 
     /** Assignment listener. */
-    private GridCacheContinuousQueryAdapter<Object, Object> assignQry;
+    private QueryCursor<Cache.Entry<Object, Object>> assignQryCur;
 
     /**
      * @param ctx Kernal context.
@@ -128,17 +130,19 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            cfgQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery();
+            IgniteCache<Object, Object> jCache = ctx.cache().utilityJCache();
 
-            cfgQry.localCallback(new DeploymentListener());
+            ContinuousQuery<Object, Object> cfgQry = Query.continuous();
 
-            cfgQry.execute(ctx.grid().forLocal(), true, false, false, true);
+            cfgQry.setLocalListener(new DeploymentListener());
 
-            assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery();
+            cfgQryCur = jCache.localQuery(cfgQry);
 
-            assignQry.localCallback(new AssignmentListener());
+            ContinuousQuery<Object, Object> assignQry = Query.continuous();
 
-            assignQry.execute(ctx.grid().forLocal(), true, false, false, true);
+            assignQry.setLocalListener(new AssignmentListener());
+
+            assignQryCur = jCache.localQuery(assignQry);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -171,21 +175,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         ctx.event().removeLocalEventListener(topLsnr);
 
-        try {
-            if (cfgQry != null)
-                cfgQry.close();
-        }
-        catch (IgniteCheckedException e) {
-            log.error("Failed to unsubscribe service configuration notifications.", e);
-        }
+        if (cfgQryCur != null)
+            cfgQryCur.close();
 
-        try {
-            if (assignQry != null)
-                assignQry.close();
-        }
-        catch (IgniteCheckedException e) {
-            log.error("Failed to unsubscribe service assignment notifications.", e);
-        }
+        if (assignQryCur != null)
+            assignQryCur.close();
 
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
@@ -916,18 +910,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Service deployment listener.
      */
-    private class DeploymentListener
-        implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> {
-        /** Serial version ID. */
-        private static final long serialVersionUID = 0L;
-
+    private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean apply(
-            UUID nodeId,
-            final Collection<CacheContinuousQueryEntry<Object, Object>> deps) {
+        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    for (Entry<Object, Object> e : deps) {
+                    for (CacheEntryEvent<?, ?> e : deps) {
                         if (!(e.getKey() instanceof GridServiceDeploymentKey))
                             continue;
 
@@ -989,8 +977,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                     }
                 }
             });
-
-            return true;
         }
 
         /**
@@ -1194,18 +1180,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Assignment listener.
      */
-    private class AssignmentListener
-        implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> {
-        /** Serial version ID. */
-        private static final long serialVersionUID = 0L;
-
+    private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean apply(
-            UUID nodeId,
-            final Collection<CacheContinuousQueryEntry<Object, Object>> assignCol) {
+        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    for (Entry<Object, Object> e : assignCol) {
+                    for (CacheEntryEvent<?, ?> e : assignCol) {
                         if (!(e.getKey() instanceof GridServiceAssignmentsKey))
                             continue;
 
@@ -1253,8 +1233,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                     }
                 }
             });
-
-            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
index e6ece60..97f1c3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
@@ -694,8 +694,8 @@ org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponseEntry
 org.apache.ignite.internal.processors.cache.query.GridCacheQueryType
 org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery
 org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery
-org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler
-org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler$DeployableObject
+org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler
+org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$DeployableObject
 org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask
 org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask$JdbcDriverMetadataJob
 org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcTask

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 7615f44..0a3c3d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -25,10 +25,9 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -42,7 +41,10 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.configuration.*;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.*;
 import javax.cache.integration.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -54,8 +56,8 @@ import static org.apache.ignite.cache.CacheDistributionMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CachePreloadMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*;
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*;
 
 /**
  * Continuous queries tests.
@@ -165,7 +167,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         for (int i = 0; i < gridCount(); i++) {
             GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous();
 
-            assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
+            assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStartAck")).size());
@@ -173,7 +175,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size());
 
-            GridCacheContinuousQueryManager mgr =
+            CacheContinuousQueryManager mgr =
                 ((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries();
 
             assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
@@ -201,14 +203,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      * @throws Exception If failed.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public void testApi() throws Exception {
-        final CacheContinuousQuery<Object, Object> q = grid(0).cache(null).queries().createContinuousQuery();
+    public void testIllegalArguments() throws Exception {
+        final ContinuousQuery<Object, Object> q = Query.continuous();
 
         GridTestUtils.assertThrows(
             log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.bufferSize(-1);
+                    q.setBufferSize(-1);
 
                     return null;
                 }
@@ -217,24 +219,20 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             null
         );
 
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.bufferSize(0);
+                    q.setBufferSize(0);
 
                     return null;
                 }
-            },
-            IllegalArgumentException.class,
-            null
+            }, IllegalArgumentException.class, null
         );
 
         GridTestUtils.assertThrows(
             log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.timeInterval(-1);
+                    q.setTimeInterval(-1);
 
                     return null;
                 }
@@ -242,128 +240,24 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             IllegalArgumentException.class,
             null
         );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.execute();
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-            @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) {
-                return true;
-            }
-        });
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.execute(grid(0).forPredicate(F.<ClusterNode>alwaysFalse()));
-
-                    return null;
-                }
-            },
-            ClusterTopologyCheckedException.class,
-            null
-        );
-
-        q.execute();
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-                        @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) {
-                            return false;
-                        }
-                    });
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.remoteFilter(null);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.bufferSize(10);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.timeInterval(10);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        q.close();
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.execute();
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAllEntries() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    U.debug(">>>>>>>>>>>>>>> EVT: " + e);
+
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -378,21 +272,17 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
 
-            cache.removex(2);
+            cache.remove(2);
 
-            cache.putx(1, 10);
+            cache.put(1, 10);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -418,25 +308,22 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, vals.size());
             assertEquals(3, (int)vals.get(0));
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testEntriesByFilter() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(4);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -451,46 +338,26 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
-            }
-        });
-
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() {
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                return e.getKey() > 2;
             }
         });
 
-        // Second query to wait for notifications about all updates.
-        CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery();
-
-        final CountDownLatch latch0 = new CountDownLatch(8);
-
-        qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID uuid,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> ignored : entries)
-                    latch0.countDown();
-
-                return true;
+        qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+                return evt.getKey() > 2;
             }
         });
 
-        try {
-            qry.execute();
-            qry0.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
-            cache.putx(4, 4);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
+            cache.put(4, 4);
 
-            cache.removex(2);
-            cache.removex(3);
+            cache.remove(2);
+            cache.remove(3);
 
-            cache.putx(1, 10);
-            cache.putx(4, 40);
+            cache.put(1, 10);
+            cache.put(4, 40);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -509,91 +376,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(2, vals.size());
             assertEquals(4, (int)vals.get(0));
             assertEquals(40, (int)vals.get(1));
-
-            assert latch0.await(2, SECONDS);
-        }
-        finally {
-            qry.close();
-            qry0.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testProjection() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        // Queries for non-partitioned caches always run locally.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
-            return;
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
-
-        final Map<Integer, List<Integer>> map = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    synchronized (map) {
-                        List<Integer> vals = map.get(e.getKey());
-
-                        if (vals == null) {
-                            vals = new ArrayList<>();
-
-                            map.put(e.getKey(), vals);
-                        }
-
-                        vals.add(e.getValue());
-                    }
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        try {
-            qry.execute(grid(0).forRemotes());
-
-            int locKey = -1;
-            int rmtKey = -1;
-
-            int key = 0;
-
-            while (true) {
-                ClusterNode n = grid(0).mapKeyToNode(null, key);
-
-                assert n != null;
-
-                if (n.equals(grid(0).localNode()))
-                    locKey = key;
-                else
-                    rmtKey = key;
-
-                key++;
-
-                if (locKey >= 0 && rmtKey >= 0)
-                    break;
-            }
-
-            cache.putx(locKey, 1);
-            cache.putx(rmtKey, 2);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(1, map.size());
-
-            List<Integer> vals = map.get(rmtKey);
-
-            assertNotNull(vals);
-            assertEquals(1, vals.size());
-            assertEquals(2, (int)vals.get(0));
-        }
-        finally {
-            qry.close();
         }
     }
 
@@ -601,20 +383,16 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testLocalNodeOnly() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        // Queries for non-partitioned caches always run locally.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
-            return;
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(1);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -629,14 +407,10 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute(grid(0).forLocal());
-
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.localQuery(qry)) {
             int locKey = -1;
             int rmtKey = -1;
 
@@ -658,8 +432,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     break;
             }
 
-            cache.putx(locKey, 1);
-            cache.putx(rmtKey, 2);
+            cache.put(locKey, 1);
+            cache.put(rmtKey, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -671,103 +445,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, vals.size());
             assertEquals(1, (int)vals.get(0));
         }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopByCallback() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
-
-        final Map<Integer, List<Integer>> map = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    synchronized (map) {
-                        List<Integer> vals = map.get(e.getKey());
-
-                        if (vals == null) {
-                            vals = new ArrayList<>();
-
-                            map.put(e.getKey(), vals);
-                        }
-
-                        vals.add(e.getValue());
-                    }
-
-                    latch.countDown();
-                }
-
-                return false;
-            }
-        });
-
-        // Second query to wait for notifications about all updates.
-        CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery();
-
-        final CountDownLatch latch0 = new CountDownLatch(3);
-
-        qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> ignored : entries)
-                    latch0.countDown();
-
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-            qry0.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(1, map.size());
-
-            List<Integer> list = F.first(map.values());
-
-            assert list != null;
-
-            assertEquals(1, list.size());
-
-            assert latch0.await(2, SECONDS);
-        }
-        finally {
-            qry.close();
-            qry0.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testBuffering() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        // Buffering make sense only for remote nodes, so test only for partitioned cache.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
+        if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED)
             return;
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -782,18 +478,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.bufferSize(5);
+        qry.setBufferSize(5);
 
-        try {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
             ClusterNode node = F.first(grid(0).forRemotes().nodes());
 
-            qry.execute(grid(0).forNode(node));
-
             Collection<Integer> keys = new HashSet<>();
 
             int key = 0;
@@ -815,12 +507,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             Iterator<Integer> it = keys.iterator();
 
             for (int i = 0; i < 4; i++)
-                cache.putx(it.next(), 0);
+                cache.put(it.next(), 0);
 
             assert !latch.await(2, SECONDS);
 
             for (int i = 0; i < 2; i++)
-                cache.putx(it.next(), 0);
+                cache.put(it.next(), 0);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -838,29 +530,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 assertEquals(0, (int)vals.get(0));
             }
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTimeInterval() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        // Buffering make sense only for remote nodes, so test only for partitioned cache.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
+        if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != PARTITIONED)
             return;
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -875,15 +563,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.bufferSize(10);
-        qry.timeInterval(3000);
+        qry.setBufferSize(10);
+        qry.setTimeInterval(3000);
 
-        try {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
             ClusterNode node = F.first(grid(0).forRemotes().nodes());
 
             Collection<Integer> keys = new HashSet<>();
@@ -905,9 +591,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
 
             for (Integer k : keys)
-                cache.putx(k, 0);
-
-            qry.execute(grid(0).forNode(node));
+                cache.put(k, 0);
 
             assert !latch.await(2, SECONDS);
             assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS);
@@ -926,125 +610,117 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 assertEquals(0, (int)vals.get(0));
             }
         }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testIteration() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(10);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        try {
-            for (int i = 0; i < 10; i++)
-                cache.putx(i, i);
-
-            qry.execute();
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(10, map.size());
-
-            for (int i = 0; i < 10; i++)
-                assertEquals(i, (int)map.get(i));
-        }
-        finally {
-            qry.close();
-        }
     }
 
-    /**
-     * @throws Exception If failed.
-     */
-    public void testIterationAndUpdates() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(12);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        try {
-            for (int i = 0; i < 10; i++)
-                cache.putx(i, i);
-
-            qry.execute();
-
-            cache.putx(10, 10);
-            cache.putx(11, 11);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
-
-            assertEquals(12, map.size());
-
-            for (int i = 0; i < 12; i++)
-                assertEquals(i, (int)map.get(i));
-        }
-        finally {
-            qry.close();
-        }
-    }
+//    /**
+//     * @throws Exception If failed.
+//     */
+//    public void testIteration() throws Exception {
+//        GridCache<Integer, Integer> cache = grid(0).cache(null);
+//
+//        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+//
+//        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
+//        final CountDownLatch latch = new CountDownLatch(10);
+//
+//        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
+//            @Override public boolean apply(UUID nodeId,
+//                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
+//                for (Map.Entry<Integer, Integer> e : entries) {
+//                    map.put(e.getKey(), e.getValue());
+//
+//                    latch.countDown();
+//                }
+//
+//                return true;
+//            }
+//        });
+//
+//        try {
+//            for (int i = 0; i < 10; i++)
+//                cache.putx(i, i);
+//
+//            qry.execute();
+//
+//            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+//
+//            assertEquals(10, map.size());
+//
+//            for (int i = 0; i < 10; i++)
+//                assertEquals(i, (int)map.get(i));
+//        }
+//        finally {
+//            qry.close();
+//        }
+//    }
+//
+//    /**
+//     * @throws Exception If failed.
+//     */
+//    public void testIterationAndUpdates() throws Exception {
+//        GridCache<Integer, Integer> cache = grid(0).cache(null);
+//
+//        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+//
+//        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
+//        final CountDownLatch latch = new CountDownLatch(12);
+//
+//        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
+//            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
+//                for (Map.Entry<Integer, Integer> e : entries) {
+//                    map.put(e.getKey(), e.getValue());
+//
+//                    latch.countDown();
+//                }
+//
+//                return true;
+//            }
+//        });
+//
+//        try {
+//            for (int i = 0; i < 10; i++)
+//                cache.putx(i, i);
+//
+//            qry.execute();
+//
+//            cache.putx(10, 10);
+//            cache.putx(11, 11);
+//
+//            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
+//
+//            assertEquals(12, map.size());
+//
+//            for (int i = 0; i < 12; i++)
+//                assertEquals(i, (int)map.get(i));
+//        }
+//        finally {
+//            qry.close();
+//        }
+//    }
 
     /**
      * @throws Exception If failed.
      */
     public void testLoadCache() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
         final CountDownLatch latch = new CountDownLatch(10);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                     map.put(e.getKey(), e.getValue());
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            for (int i = 0; i < gridCount(); i++)
-                grid(i).cache(null).loadCache(null, 0);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.loadCache(null, 0);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
 
@@ -1053,170 +729,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             for (int i = 0; i < 10; i++)
                 assertEquals(i, (int)map.get(i));
         }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTypedProjection() throws Exception {
-        GridCache<Object, Object> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry =
-            cache.projection(Integer.class, Integer.class).queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() {
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx("a", "a");
-            cache.putx(2, 2);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(2, map.size());
-
-            assertEquals(1, (int)map.get(1));
-            assertEquals(2, (int)map.get(2));
-        }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEntryFilterProjection() throws Exception {
-        CacheProjection<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.projection(
-            new P1<CacheEntry<Integer, Integer>>() {
-                @Override public boolean apply(CacheEntry<Integer, Integer> e) {
-                    Integer i = e.peek();
-
-                    return i != null && i > 10;
-                }
-            }).queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (CacheContinuousQueryEntry<Integer, Integer> e : entries) {
-                    info("Query entry: " + e);
-
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() {
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx(11, 11);
-            cache.putx(2, 2);
-            cache.putx(22, 22);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals("Invalid number of entries notified: " + map, 2, map.size());
-
-            assertEquals(11, (int)map.get(11));
-            assertEquals(22, (int)map.get(22));
-        }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testKeyValueFilterProjection() throws Exception {
-        CacheProjection<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache.projection(
-            new P2<Integer, Integer>() {
-                @Override public boolean apply(Integer key, Integer val) {
-                    return val > 10;
-                }
-            }).queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() {
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx(11, 11);
-            cache.putx(2, 2);
-            cache.putx(22, 22);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(2, map.size());
-
-            assertEquals(11, (int)map.get(11));
-            assertEquals(22, (int)map.get(22));
-        }
-        finally {
-            qry.close();
-        }
     }
 
     /**
@@ -1226,33 +738,28 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         if (atomicityMode() == ATOMIC)
             return;
 
-        GridCache<Object, Object> cache = grid(0).cache(null);
+        IgniteCache<Object, Object> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Object, Object> qry = Query.continuous();
 
         final Map<Object, Object> map = new ConcurrentHashMap8<>();
         final CountDownLatch latch = new CountDownLatch(2);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Object, Object>> entries) {
-                for (Map.Entry<Object, Object> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                for (CacheEntryEvent<?, ?> e : evts) {
                     map.put(e.getKey(), e.getValue());
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            cache.putx(new GridCacheInternalKeyImpl("test"), 1);
+        try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
+            cache.put(new GridCacheInternalKeyImpl("test"), 1);
 
-            cache.putx(1, 1);
-            cache.putx(2, 2);
+            cache.put(1, 1);
+            cache.put(2, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -1261,92 +768,41 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, (int)map.get(1));
             assertEquals(2, (int)map.get(2));
         }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If filter.
-     */
-    public void testUpdateInFilter() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        cache.putx(1, 1);
-
-        CacheProjection<Integer, Integer> prj = cache.projection(new P1<CacheEntry<Integer, Integer>>() {
-            @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public boolean apply(final CacheEntry<Integer, Integer> e) {
-                GridTestUtils.assertThrows(
-                    log,
-                    new Callable<Object>() {
-                        @Override public Object call() throws Exception {
-                            e.set(1000);
-
-                            return null;
-                        }
-                    },
-                    CacheFlagException.class,
-                    null
-                );
-
-                return true;
-            }
-        });
-
-        CacheContinuousQuery<Integer, Integer> qry = prj.queries().createContinuousQuery();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-        }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNodeJoin() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-        final Collection<Map.Entry<Integer, Integer>> all = new ConcurrentLinkedDeque8<>();
+        final Collection<CacheEntryEvent<? extends Integer, ? extends Integer>> all = new ConcurrentLinkedDeque8<>();
         final CountDownLatch latch = new CountDownLatch(2);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                assertEquals(1, entries.size());
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                int size = 0;
 
-                all.addAll(entries);
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+                    all.add(evt);
 
-                latch.countDown();
+                    size++;
+                }
 
-                return true;
+                assertEquals(1, size);
+
+                latch.countDown();
             }
         });
 
-        qry.execute();
-
-        cache.putx(1, 1);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
 
-        try {
             startGrid("anotherGrid");
 
-            cache.putx(2, 2);
+            cache.put(2, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all;
 
@@ -1354,8 +810,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         }
         finally {
             stopGrid("anotherGrid");
-
-            qry.close();
         }
     }
 
@@ -1363,9 +817,9 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testCallbackForPreload() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        if (cache.configuration().getCacheMode() == LOCAL)
+        if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() == LOCAL)
             return;
 
         Map<Integer, Integer> map = new HashMap<>();
@@ -1377,37 +831,28 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
         cache.putAll(map);
 
-        Ignite ignite = startGrid("anotherGrid");
-
-        try {
-            cache = ignite.cache(null);
-
-            CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            final Collection<Integer> keys = new GridConcurrentHashSet<>();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-            qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-                @Override public boolean apply(UUID nodeId,
-                    Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                    for (Map.Entry<Integer, Integer> e : entries) {
-                        keys.add(e.getKey());
+        final CountDownLatch latch = new CountDownLatch(1);
+        final Collection<Integer> keys = new GridConcurrentHashSet<>();
 
-                        if (keys.size() >= keysCnt)
-                            latch.countDown();
-                    }
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+                    keys.add(evt.getKey());
 
-                    return true;
+                    if (keys.size() >= keysCnt)
+                        latch.countDown();
                 }
-            });
+            }
+        });
 
-            qry.execute();
+        Ignite ignite = startGrid("anotherGrid");
 
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = ignite.<Integer, Integer>jcache(null).localQuery(qry)) {
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
             assertEquals(keysCnt, keys.size());
-
-            qry.close();
         }
         finally {
             stopGrid("anotherGrid");
@@ -1475,26 +920,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 grid(i).events().localListen(execLsnr, EVT_CACHE_QUERY_EXECUTED);
             }
 
-            GridCache<Integer, Integer> cache = grid(0).cache(null);
+            IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-            try (CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery()) {
-                qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-                    @Override public boolean apply(UUID uuid,
-                        Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                        return true;
-                    }
-                });
+            ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-                qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, Integer>>() {
-                    @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                        return e.getValue() >= 50;
-                    }
-                });
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                    // No-op.
+                }
+            });
 
-                qry.execute();
+            qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() {
+                @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+                    return evt.getValue() >= 50;
+                }
+            });
 
+            try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
                 for (int i = 0; i < 100; i++)
-                    cache.putx(i, i);
+                    cache.put(i, i);
 
                 assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
                 assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -1515,8 +959,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      */
     private static class TestStore extends CacheStoreAdapter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo,
-            Object... args) {
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
             for (int i = 0; i < 10; i++)
                 clo.apply(i, i);
         }


Mime
View raw message