ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [03/37] incubator-ignite git commit: # ignite-43
Date Tue, 20 Jan 2015 08:49:35 GMT
# ignite-43


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f02315
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f02315
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f02315

Branch: refs/heads/sprint-1
Commit: f9f02315992b046106b8fad38b14c37887e6103c
Parents: dc3faef
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 31 12:03:37 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 31 15:53:08 2014 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheEntryEvent.java    |  29 +-
 .../processors/cache/IgniteCacheProxy.java      | 323 +----------
 .../grid/cache/GridCacheConfiguration.java      |   1 +
 .../cache/query/GridCacheContinuousQuery.java   |  10 +-
 .../grid/kernal/GridEventConsumeHandler.java    |   4 +-
 .../processors/cache/GridCacheMapEntry.java     |  49 +-
 .../processors/cache/GridCacheProcessor.java    |   2 +-
 .../GridCacheDataStructuresManager.java         |   2 +-
 .../GridCacheContinuousQueryAdapter.java        |  39 +-
 .../GridCacheContinuousQueryEntry.java          |  28 +-
 .../GridCacheContinuousQueryHandler.java        |  23 +-
 .../GridCacheContinuousQueryHandlerV2.java      |  12 +-
 .../GridCacheContinuousQueryManager.java        | 498 ++++++++++++++++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 535 +++++++++++++++++--
 ...IgniteCacheEntryListenerAtomicLocalTest.java |  41 ++
 ...eCacheEntryListenerAtomicReplicatedTest.java |  24 +
 .../IgniteCacheEntryListenerAtomicTest.java     |  47 ++
 .../IgniteCacheEntryListenerTxLocalTest.java    |  41 ++
 ...gniteCacheEntryListenerTxReplicatedTest.java |  24 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |  41 ++
 .../bamboo/GridDataGridTestSuite.java           |   7 +
 22 files changed, 1341 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
index b3a4f52..b480ca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntryEvent.java
@@ -10,7 +10,7 @@
 package org.apache.ignite.cache;
 
 import org.apache.ignite.*;
-import org.apache.ignite.events.*;
+import org.gridgain.grid.cache.query.*;
 
 import javax.cache.event.*;
 
@@ -19,52 +19,49 @@ import javax.cache.event.*;
  */
 public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> {
     /** */
-    private final IgniteCacheEvent evt;
+    private final GridCacheContinuousQueryEntry<K, V> e;
 
     /**
      * @param src Cache.
      * @param type Event type.
-     * @param evt Ignite event.
+     * @param e Ignite event.
      */
-    public CacheEntryEvent(IgniteCache src, EventType type, IgniteCacheEvent evt) {
+    public CacheEntryEvent(IgniteCache src, EventType type, GridCacheContinuousQueryEntry<K, V> e) {
         super(src, type);
 
-        this.evt = evt;
+        this.e = e;
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public V getOldValue() {
-        return (V)evt.oldValue();
+        return e.getOldValue();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isOldValueAvailable() {
-        return evt.hasOldValue();
+        return e.getOldValue() != null;
     }
 
     /** {@inheritDoc} */
     @Override public K getKey() {
-        return evt.key();
+        return e.getKey();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public V getValue() {
-        return (V)evt.newValue();
+        return e.getValue();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {
-        if (cls.equals(IgniteCacheEvent.class))
-            return (T)evt;
-
         throw new IllegalArgumentException();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "CacheEntryEvent [evtType=" + getEventType() + ", evt=" + evt + ']';
+        return "CacheEntryEvent [evtType=" + getEventType() +
+            ", key=" + getKey() +
+            ", val=" + getValue() +
+            ", oldVal=" + getOldValue() + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 47d0722..b526f15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -11,11 +11,8 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.CacheEntryEvent;
 import org.apache.ignite.cache.query.*;
-import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
@@ -26,7 +23,6 @@ import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
-import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
 import javax.cache.processor.*;
@@ -34,8 +30,6 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
-import static org.apache.ignite.events.IgniteEventType.*;
-
 /**
  * Cache proxy.
  */
@@ -771,325 +765,36 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
     }
 
     /** {@inheritDoc} */
-    @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
+    @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            A.notNull(lsnrCfg, "lsnrCfg");
-
-            Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory();
-
-            A.notNull(factory, "cacheEntryListenerFactory");
-
-            CacheEntryListener lsnr = factory.create();
-
-            A.notNull(lsnr, "lsnr");
-
-            EventCallback cb = new EventCallback(lsnr);
-
-            Set<Integer> types = new HashSet<>();
-
-            if (cb.create() || cb.update())
-                types.add(EVT_CACHE_OBJECT_PUT);
-
-            if (cb.remove())
-                types.add(EVT_CACHE_OBJECT_REMOVED);
-
-            if (cb.expire())
-                types.add(EVT_CACHE_OBJECT_EXPIRED);
-
-            if (types.isEmpty())
-                throw new IllegalArgumentException();
-
-            int[] types0 = new int[types.size()];
-
-            int i = 0;
-
-            for (Integer type : types)
-                types0[i++] = type;
-
-            EventFilter fltr = new EventFilter(cb.create(),
-                cb.update(),
-                lsnrCfg.getCacheEntryEventFilterFactory(),
-                ignite(),
-                ctx.name());
-
-            IgniteFuture<UUID> fut = ctx.kernalContext().continuous().startRoutine(
-                new GridEventConsumeHandler(cb, fltr, types0),
-                1,
-                0,
-                true,
-                null);
-
-            try {
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+            ctx.continuousQueries().registerCacheEntryListener(lsnrCfg);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
             gate.leave(prev);
         }
     }
 
-    /**
-     *
-     */
-    static class EventFilter implements IgnitePredicate<IgniteEvent>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private boolean update;
-
-        /** */
-        private boolean create;
-
-        /** */
-        private Factory<CacheEntryEventFilter> fltrFactory;
-
-        /** */
-        private CacheEntryEventFilter fltr;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        private IgniteCache cache;
-
-        /** */
-        private String cacheName;
-
-        /**
-         *
-         */
-        public EventFilter() {
-            // No-op.
-        }
-
-        /**
-         * @param create {@code True} if listens for create event.
-         * @param update {@code True} if listens for create event.
-         * @param fltrFactory Filter factory.
-         * @param ignite Ignite instance.
-         * @param cacheName Cache name.
-         */
-        EventFilter(
-            boolean create,
-            boolean update,
-            Factory<CacheEntryEventFilter> fltrFactory,
-            Ignite ignite,
-            @Nullable String cacheName) {
-            this.update = update;
-            this.create = create;
-            this.fltrFactory = fltrFactory;
-            this.ignite = ignite;
-            this.cacheName = cacheName;
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-
-            cache = ignite.jcache(cacheName);
-
-            assert cache != null : cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(IgniteEvent evt) {
-            assert evt instanceof IgniteCacheEvent : evt;
-
-            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
-
-            EventType evtType;
-
-            switch (cacheEvt.type()) {
-                case EVT_CACHE_OBJECT_REMOVED: {
-                    evtType = EventType.REMOVED;
-
-                    break;
-                }
-
-                case EVT_CACHE_OBJECT_PUT: {
-                    assert update || create;
-
-                    if (cacheEvt.hasOldValue()) {
-                        if (!update)
-                            return false;
-
-                        evtType = EventType.UPDATED;
-                    }
-                    else {
-                        if (!create)
-                            return false;
-
-                        evtType = EventType.CREATED;
-                    }
-
-                    break;
-                }
-
-                case EVT_CACHE_OBJECT_EXPIRED: {
-                    evtType = EventType.EXPIRED;
-
-                    break;
-                }
-
-                default:
-                    assert false : cacheEvt;
-
-                    throw new IgniteException("Unexpected event: " + cacheEvt);
-            }
-
-            if (cache == null) {
-                cache = ignite.jcache(cacheName);
-
-                assert cache != null : cacheName;
-            }
-
-            return fltr == null || fltr.evaluate(new CacheEntryEvent(cache, evtType, cacheEvt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeBoolean(create);
-
-            out.writeBoolean(update);
-
-            U.writeString(out, cacheName);
-
-            out.writeObject(fltrFactory);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            create = in.readBoolean();
-
-            update = in.readBoolean();
-
-            cacheName = U.readString(in);
-
-            fltrFactory = (Factory<CacheEntryEventFilter>)in.readObject();
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-        }
-    }
-
-    /**
-     *
-     */
-    class EventCallback implements IgniteBiPredicate<UUID, IgniteEvent> {
-        /** */
-        private final CacheEntryCreatedListener createLsnr;
-
-        /** */
-        private final CacheEntryUpdatedListener updateLsnr;
-
-        /** */
-        private final CacheEntryRemovedListener rmvLsnr;
-
-        /** */
-        private final CacheEntryExpiredListener expireLsnr;
-
-        /**
-         * @param lsnr Listener.
-         */
-        EventCallback(CacheEntryListener lsnr) {
-            createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null;
-            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null;
-            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null;
-            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null;
-        }
-
-        /**
-         * @return {@code True} if listens for create event.
-         */
-        boolean create() {
-            return createLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for update event.
-         */
-        boolean update() {
-            return updateLsnr != null;
-        }
+    /** {@inheritDoc} */
+    @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
-        /**
-         * @return {@code True} if listens for remove event.
-         */
-        boolean remove() {
-            return rmvLsnr != null;
+        try {
+            ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg);
         }
-
-        /**
-         * @return {@code True} if listens for expire event.
-         */
-        boolean expire() {
-            return expireLsnr != null;
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(UUID uuid, IgniteEvent evt) {
-            assert evt instanceof IgniteCacheEvent : evt;
-
-            IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt;
-
-            switch (cacheEvt.type()) {
-                case EVT_CACHE_OBJECT_REMOVED: {
-                    assert rmvLsnr != null;
-
-                    CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.REMOVED, cacheEvt);
-
-                    rmvLsnr.onRemoved(Collections.singleton(evt0));
-
-                    break;
-                }
-
-                case EVT_CACHE_OBJECT_PUT: {
-                    if (cacheEvt.hasOldValue()) {
-                        assert updateLsnr != null;
-
-                        CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.UPDATED, cacheEvt);
-
-                        updateLsnr.onUpdated(Collections.singleton(evt0));
-                    }
-                    else {
-                        assert createLsnr != null;
-
-                        CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.CREATED, cacheEvt);
-
-                        createLsnr.onCreated(Collections.singleton(evt0));
-                    }
-
-                    break;
-                }
-
-                case EVT_CACHE_OBJECT_EXPIRED: {
-                    assert expireLsnr != null;
-
-                    CacheEntryEvent evt0 = new CacheEntryEvent(IgniteCacheProxy.this, EventType.EXPIRED, cacheEvt);
-
-                    expireLsnr.onExpired(Collections.singleton(evt0));
-
-                    break;
-                }
-            }
-
-            return false;
+        finally {
+            gate.leave(prev);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
-    }
-
-    /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java
index 5a9a675..149bee9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java
@@ -362,6 +362,7 @@ public class GridCacheConfiguration extends MutableConfiguration {
         interceptor = cc.getInterceptor();
         invalidate = cc.isInvalidate();
         keepPortableInStore = cc.isKeepPortableInStore();
+        listenerConfigurations = cc.listenerConfigurations;
         offHeapMaxMem = cc.getOffHeapMaxMemory();
         maxConcurrentAsyncOps = cc.getMaxConcurrentAsyncOperations();
         maxQryIterCnt = cc.getMaximumQueryIteratorCount();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java
index a2675aa..60db85a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheContinuousQuery.java
@@ -136,13 +136,13 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable {
      * can get deadlocks.
      *
      * @param cb Local callback.
-     * @deprecated Deprecated in favor of {@link #localCallback(org.apache.ignite.lang.IgniteBiPredicate)} method.
+     * @deprecated Deprecated in favor of {@link #localCallback(IgniteBiPredicate)} method.
      */
     @Deprecated
     public void callback(@Nullable IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb);
 
     /**
-     * Gets local callback. See {@link #callback(org.apache.ignite.lang.IgniteBiPredicate)} for more information.
+     * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information.
      *
      * @return Local callback.
      * @deprecated Deprecated in favor of {@link #localCallback()} method.
@@ -167,7 +167,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable {
     public void filter(@Nullable IgniteBiPredicate<K, V> filter);
 
     /**
-     * Gets key-value filter. See {@link #filter(org.apache.ignite.lang.IgniteBiPredicate)} for more information.
+     * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information.
      *
      * @return Key-value filter.
      * @deprecated Deprecated in favor of {@link #remoteFilter()} method.
@@ -197,7 +197,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable {
     public void localCallback(IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> locCb);
 
     /**
-     * Gets local callback. See {@link #callback(org.apache.ignite.lang.IgniteBiPredicate)} for more information.
+     * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information.
      *
      * @return Local callback.
      */
@@ -218,7 +218,7 @@ public interface GridCacheContinuousQuery<K, V> extends AutoCloseable {
     public void remoteFilter(@Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter);
 
     /**
-     * Gets key-value filter. See {@link #filter(org.apache.ignite.lang.IgniteBiPredicate)} for more information.
+     * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information.
      *
      * @return Key-value filter.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
index 49fbb81..a48f6e1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridEventConsumeHandler.java
@@ -31,7 +31,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
 /**
  * Continuous routine handler for remote event listening.
  */
-public class GridEventConsumeHandler implements GridContinuousHandler {
+class GridEventConsumeHandler implements GridContinuousHandler {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -76,7 +76,7 @@ public class GridEventConsumeHandler implements GridContinuousHandler {
      * @param filter Filter.
      * @param types Types.
      */
-    public GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter,
+    GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, IgniteEvent> cb, @Nullable IgnitePredicate<IgniteEvent> filter,
         @Nullable int[] types) {
         this.cb = cb == null ? DFLT_CALLBACK : cb;
         this.filter = filter;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index bb493cc..b19e44f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -775,9 +775,23 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 ret = old;
             }
 
-            if (evt && expired && cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
-                cctx.events().addEvent(partition(), key, tx, owner, EVT_CACHE_OBJECT_EXPIRED, null, false, expiredVal,
-                    expiredVal != null || hasOldBytes, subjId, null, taskName);
+            if (evt && expired) {
+                if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
+                    cctx.events().addEvent(partition(),
+                        key,
+                        tx,
+                        owner,
+                        EVT_CACHE_OBJECT_EXPIRED,
+                        null,
+                        false,
+                        expiredVal,
+                        expiredVal != null || hasOldBytes,
+                        subjId,
+                        null,
+                        taskName);
+                }
+
+                cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null);
 
                 // No more notifications.
                 evt = false;
@@ -1147,7 +1161,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
             if (mode == GridCacheMode.LOCAL || mode == GridCacheMode.REPLICATED ||
                 (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes);
+                cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1309,7 +1323,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 if (mode == GridCacheMode.LOCAL || mode == GridCacheMode.REPLICATED ||
                     (tx != null && tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes);
+                    cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
             }
@@ -1580,7 +1594,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (metrics)
                 cctx.cache().metrics0().onWrite();
 
-            cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes);
+            cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == DELETE);
 
@@ -2038,7 +2052,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 cctx.cache().metrics0().onWrite();
 
             if (primary || cctx.isReplicated())
-                cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes);
+                cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == DELETE);
 
@@ -2959,8 +2973,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
     /** {@inheritDoc} */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Override public boolean initialValue(V val, byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime,
-        boolean preload, long topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException {
+    @Override public boolean initialValue(V val,
+        byte[] valBytes,
+        GridCacheVersion ver,
+        long ttl,
+        long expireTime,
+        boolean preload,
+        long topVer,
+        GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException {
         if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked())
             val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader());
 
@@ -2996,8 +3016,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.affinity().primary(cctx.localNode(), key, topVer) || cctx.isReplicated())
-                        cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), null, null);
+                    if (cctx.affinity().primary(cctx.localNode(), key, topVer) || cctx.isReplicated()) {
+                        cctx.continuousQueries().onEntryUpdate(this,
+                            key,
+                            val,
+                            valueBytesUnlocked(),
+                            null,
+                            null,
+                            preload);
+                    }
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 8e9cea0..46755c6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -43,6 +43,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.configuration.*;
 import javax.management.*;
 import java.util.*;
 
@@ -811,7 +812,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             cacheCtx.cache().start();
 
-
             if (log.isInfoEnabled())
                 log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']');
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
index 35b10b7..db072d6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java
@@ -679,7 +679,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager
                }
             });
 
-            queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true);
+            queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, true, false);
         }
 
         GridCacheQueueProxy queue = queuesMap.get(header.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index 9b88858..c00c961 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
@@ -24,6 +23,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.grid.util.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
@@ -213,12 +213,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
 
     /** {@inheritDoc} */
     @Override public void execute() throws IgniteCheckedException {
-        execute(null, false);
+        execute(null, false, false);
     }
 
     /** {@inheritDoc} */
     @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException {
-        execute(prj, false);
+        execute(prj, false, false);
     }
 
     /**
@@ -226,9 +226,10 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
      *
      * @param prj Grid projection.
      * @param internal If {@code true} then query notified about internal entries updates.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      * @throws IgniteCheckedException If failed.
      */
-    public void execute(@Nullable ClusterGroup prj, boolean internal) throws IgniteCheckedException {
+    public void execute(@Nullable ClusterGroup prj, boolean internal, boolean entryLsnr) throws IgniteCheckedException {
         if (locCb == null)
             throw new IllegalStateException("Mandatory local callback is not set for the query: " + this);
 
@@ -271,12 +272,32 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
 
             guard.block();
 
-            GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ?
-                new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal,
-                    ctx.kernalContext().job().currentTaskNameHash()) :
-                new GridCacheContinuousQueryHandler<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal);
+            GridContinuousHandler hnd;
+
+            if (ctx.kernalContext().security().enabled()) {
+                hnd = new GridCacheContinuousQueryHandlerV2<>(ctx.name(),
+                    topic,
+                    locCb,
+                    rmtFilter,
+                    prjPred,
+                    internal,
+                    entryLsnr,
+                    ctx.kernalContext().job().currentTaskNameHash());
+            }
+            else {
+                hnd = new GridCacheContinuousQueryHandler<>(ctx.name(),
+                    topic,
+                    locCb,
+                    rmtFilter,
+                    prjPred,
+                    internal,
+                    entryLsnr);
+            }
 
-            routineId = ctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe,
+            routineId = ctx.kernalContext().continuous().startRoutine(hnd,
+                bufSize,
+                timeInterval,
+                autoUnsubscribe,
                 prj.predicate()).get();
         }
         finally {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
index 991573b..3c5265c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java
@@ -77,6 +77,9 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
     @GridToStringExclude
     private GridDeploymentInfo depInfo;
 
+    /** */
+    private boolean expired;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -85,7 +88,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
         impl = null;
     }
 
-    /**
+    /*
      * @param ctx Cache context.
      * @param impl Cache entry.
      * @param key Key.
@@ -93,9 +96,16 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
      * @param newValBytes Value bytes.
      * @param oldVal Old value.
      * @param oldValBytes Old value bytes.
+     * @param expired {@code True} if created for expired entry.
      */
-    GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, GridCacheEntry<K, V> impl, K key, @Nullable V newVal,
-        @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, @Nullable GridCacheValueBytes oldValBytes) {
+    GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx,
+        GridCacheEntry<K, V> impl,
+        K key,
+        @Nullable V newVal,
+        @Nullable GridCacheValueBytes newValBytes,
+        @Nullable V oldVal,
+        @Nullable GridCacheValueBytes oldValBytes,
+        boolean expired) {
         assert ctx != null;
         assert impl != null;
         assert key != null;
@@ -107,6 +117,14 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
         this.newValBytes = newValBytes;
         this.oldVal = oldVal;
         this.oldValBytes = oldValBytes;
+        this.expired = expired;
+    }
+
+    /**
+     * @return {@code True} if entry expired.
+     */
+    public boolean expired() {
+        return expired;
     }
 
     /**
@@ -710,6 +728,8 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
             out.writeObject(newVal);
             out.writeObject(oldVal);
         }
+
+        out.writeBoolean(expired);
     }
 
     /** {@inheritDoc} */
@@ -734,6 +754,8 @@ public class GridCacheContinuousQueryEntry<K, V> implements GridCacheEntry<K, V>
             newVal = (V)in.readObject();
             oldVal = (V)in.readObject();
         }
+
+        expired = in.readBoolean();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index 9d810c0..e61b2a2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
 
@@ -61,6 +62,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** Internal flag. */
     private boolean internal;
 
+    /** Entry listener flag. */
+    private boolean entryLsnr;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -75,11 +79,15 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      * @param filter Filter.
      * @param prjPred Projection predicate.
      * @param internal If {@code true} then query is notified about internal entries updates.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      */
-    GridCacheContinuousQueryHandler(@Nullable String cacheName, Object topic,
+    GridCacheContinuousQueryHandler(@Nullable String cacheName,
+        Object topic,
         IgniteBiPredicate<UUID, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>> cb,
         @Nullable IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> filter,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal) {
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
+        boolean internal,
+        boolean entryLsnr) {
         assert topic != null;
         assert cb != null;
 
@@ -89,6 +97,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.filter = filter;
         this.prjPred = prjPred;
         this.internal = internal;
+        this.entryLsnr = entryLsnr;
     }
 
     /** {@inheritDoc} */
@@ -183,7 +192,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                         }
                     }
 
-                    if (recordEvt) {
+                    if (!entryLsnr && recordEvt) {
                         ctx.event().record(new IgniteCacheQueryReadEvent<>(
                             ctx.discovery().localNode(),
                             "Continuous query executed.",
@@ -241,12 +250,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             }
         };
 
-        return manager(ctx).registerListener(nodeId, routineId, lsnr, internal);
+        return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr);
     }
 
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
-        manager(ctx).iterate(internal, routineId);
+        manager(ctx).iterate(internal, routineId, entryLsnr);
     }
 
     /** {@inheritDoc} */
@@ -371,6 +380,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             out.writeObject(prjPred);
 
         out.writeBoolean(internal);
+
+        out.writeBoolean(entryLsnr);
     }
 
     /** {@inheritDoc} */
@@ -394,6 +405,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             prjPred = (IgnitePredicate<GridCacheEntry<K, V>>)in.readObject();
 
         internal = in.readBoolean();
+
+        entryLsnr = in.readBoolean();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
index 6a185fd..63209ec 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV2.java
@@ -14,6 +14,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
 
@@ -41,13 +42,18 @@ public class GridCacheContinuousQueryHandlerV2<K, V> extends GridCacheContinuous
      * @param filter Filter.
      * @param prjPred Projection predicate.
      * @param internal If {@code true} then query is notified about internal entries updates.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      * @param taskHash Task hash.
      */
-    public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName, Object topic,
+    public GridCacheContinuousQueryHandlerV2(@Nullable String cacheName,
+        Object topic,
         IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb,
         @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, int taskHash) {
-        super(cacheName, topic, cb, filter, prjPred, internal);
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred,
+        boolean internal,
+        boolean entryLsnr,
+        int taskHash) {
+        super(cacheName, topic, cb, filter, prjPred, internal, entryLsnr);
 
         this.taskHash = taskHash;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
index b67d93c..8c1b70e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
@@ -11,18 +11,25 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.configuration.*;
+import javax.cache.event.*;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static javax.cache.event.EventType.*;
 import static org.apache.ignite.events.IgniteEventType.*;
 import static org.gridgain.grid.kernal.GridTopic.*;
 
@@ -48,6 +55,10 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
     /** Query sequence number for message topic. */
     private final AtomicLong seq = new AtomicLong();
 
+    /** Continues queries created for cache event listeners. */
+    private final ConcurrentMap<CacheEntryListenerConfiguration, GridCacheContinuousQuery<K, V>> lsnrQrys =
+        new ConcurrentHashMap8<>();
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         // Append cache name to the topic.
@@ -55,6 +66,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (intLsnrCnt.get() > 0 || lsnrCnt.get() > 0) {
             Collection<ClusterNode> nodes = cctx.discovery().cacheNodes(cctx.name(), -1);
@@ -65,6 +77,30 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
                         "for versions below 6.2.0");
             }
         }
+
+        Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations();
+
+        if (lsnrCfgs != null) {
+            IgniteCacheProxy<K, V> cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+            for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs)
+                cache.registerCacheEntryListener(cfg);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        super.onKernalStop0(cancel);
+
+        for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) {
+            try {
+                deregisterCacheEntryListener(lsnrCfg);
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to remove cache entry listener: " + e);
+            }
+        }
     }
 
     /**
@@ -84,10 +120,16 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
      * @param newBytes New value bytes.
      * @param oldVal Old value.
      * @param oldBytes Old value bytes.
+     * @param preload {@code True} if entry is updated during preloading.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryUpdate(GridCacheEntryEx<K, V> e, K key, @Nullable V newVal,
-        @Nullable GridCacheValueBytes newBytes, V oldVal, @Nullable GridCacheValueBytes oldBytes) throws IgniteCheckedException {
+    public void onEntryUpdate(GridCacheEntryEx<K, V> e,
+        K key,
+        @Nullable V newVal,
+        @Nullable GridCacheValueBytes newBytes,
+        V oldVal,
+        @Nullable GridCacheValueBytes oldBytes,
+        boolean preload) throws IgniteCheckedException {
         assert e != null;
         assert key != null;
 
@@ -104,25 +146,145 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
         oldVal = cctx.unwrapTemporary(oldVal);
 
         GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>(
-            cctx, e.wrap(false), key, newVal, newBytes, oldVal, oldBytes);
+            cctx,
+            e.wrap(false),
+            key,
+            newVal,
+            newBytes,
+            oldVal,
+            oldBytes,
+            false);
 
         e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
 
         boolean recordEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-        for (ListenerInfo<K, V> lsnr : lsnrCol.values())
+        for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
+            if (preload && lsnr.entryListener())
+                continue;
+
             lsnr.onEntryUpdate(e0, recordEvt);
+        }
+    }
+
+    /**
+     * @param e Entry.
+     * @param key Key.
+     * @param oldVal Old value.
+     * @param oldBytes Old value bytes.
+     */
+    public void onEntryExpired(GridCacheEntryEx<K, V> e,
+        K key,
+        V oldVal,
+        @Nullable GridCacheValueBytes oldBytes) {
+        if (e.isInternal())
+            return;
+
+        ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs;
+
+        if (F.isEmpty(lsnrCol))
+            return;
+
+        GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>(
+            cctx,
+            e.wrap(false),
+            key,
+            null,
+            null,
+            oldVal,
+            oldBytes,
+            true);
+
+        for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
+            if (!lsnr.entryListener())
+                continue;
+
+            lsnr.onEntryUpdate(e0, false);
+        }
+    }
+
+    /**
+     * @param lsnrCfg Listener configuration.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg)
+        throws IgniteCheckedException {
+        GridCacheContinuousQueryAdapter<K, V> qry = null;
+
+        try {
+            A.notNull(lsnrCfg, "lsnrCfg");
+
+            Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory();
+
+            A.notNull(factory, "cacheEntryListenerFactory");
+
+            CacheEntryListener lsnr = factory.create();
+
+            A.notNull(lsnr, "lsnr");
+
+            IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name());
+
+            EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr);
+
+            if (!(cb.create() || cb.update() || cb.remove() || cb.expire()))
+                throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces.");
+
+            qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery();
+
+            GridCacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry);
+
+            if (old != null)
+                throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg);
+
+            qry.autoUnsubscribe(true);
+
+            qry.bufferSize(1);
+
+            qry.localCallback(cb);
+
+            EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(),
+                cb.update(),
+                cb.remove(),
+                cb.expire(),
+                lsnrCfg.getCacheEntryEventFilterFactory(),
+                cctx.kernalContext().grid(),
+                cctx.name());
+
+            qry.remoteFilter(fltr);
+
+            qry.execute(null, false, true);
+        }
+        catch (IgniteCheckedException e) {
+            lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it.
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param lsnrCfg Listener configuration.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException {
+        A.notNull(lsnrCfg, "lsnrCfg");
+
+        GridCacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg);
+
+        if (qry != null)
+            qry.close();
     }
 
     /**
-     * @param nodeId Node ID.
      * @param lsnrId Listener ID.
      * @param lsnr Listener.
      * @param internal Internal flag.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      * @return Whether listener was actually registered.
      */
-    boolean registerListener(UUID nodeId, UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, boolean internal) {
-        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr);
+    boolean registerListener(UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr,
+        boolean internal,
+        boolean entryLsnr) {
+        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr);
 
         boolean added;
 
@@ -164,25 +326,36 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
      * Iterates through existing data.
      *
      * @param internal Internal flag.
+     * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      * @param id Listener ID.
      */
-    void iterate(boolean internal, UUID id) {
+    void iterate(boolean internal, UUID id, boolean entryLsnr) {
         ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id);
 
         assert info != null;
 
-        Set<GridCacheEntry<K, V>> entries;
-
-        if (cctx.isReplicated())
-            entries = internal ? cctx.cache().entrySetx() :
-                cctx.cache().entrySet();
-        else
-            entries = internal ? cctx.cache().primaryEntrySetx() :
-                cctx.cache().primaryEntrySet();
-
-        for (GridCacheEntry<K, V> e : entries) {
-            info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null),
-                !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
+        if (!entryLsnr) {
+            Set<GridCacheEntry<K, V>> entries;
+
+            if (cctx.isReplicated())
+                entries = internal ? cctx.cache().entrySetx() :
+                    cctx.cache().entrySet();
+            else
+                entries = internal ? cctx.cache().primaryEntrySetx() :
+                    cctx.cache().primaryEntrySet();
+
+            for (GridCacheEntry<K, V> e : entries) {
+                GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx,
+                    e,
+                    e.getKey(),
+                    e.getValue(),
+                    null,
+                    null,
+                    null,
+                    false);
+
+                info.onIterate(qryEntry, !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
+            }
         }
 
         info.flushPending();
@@ -198,11 +371,16 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
         /** Pending entries. */
         private Collection<PendingEntry<K, V>> pending = new LinkedList<>();
 
+        /** */
+        private boolean entryLsnr;
+
         /**
          * @param lsnr Listener.
+         * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}.
          */
-        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr) {
+        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) {
             this.lsnr = lsnr;
+            this.entryLsnr = entryLsnr;
         }
 
         /**
@@ -247,6 +425,13 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
             for (PendingEntry<K, V> e : pending0)
                 lsnr.onEntryUpdate(e.entry, e.recordEvt);
         }
+
+        /**
+         * @return {@code True} if listener created for {@link CacheEntryListener}.
+         */
+        boolean entryListener() {
+            return entryLsnr;
+        }
     }
 
     /**
@@ -268,4 +453,275 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
             this.recordEvt = recordEvt;
         }
     }
+
+    /**
+     *
+     */
+    static class EntryListenerFilter<K1, V1> implements
+        IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1>>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private boolean create;
+
+        /** */
+        private boolean update;
+
+        /** */
+        private boolean rmv;
+
+        /** */
+        private boolean expire;
+
+        /** */
+        private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory;
+
+        /** */
+        private CacheEntryEventFilter fltr;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteCache cache;
+
+        /** */
+        private String cacheName;
+
+        /**
+         *
+         */
+        public EntryListenerFilter() {
+            // No-op.
+        }
+
+        /**
+         * @param create {@code True} if listens for create events.
+         * @param update {@code True} if listens for create events.
+         * @param rmv {@code True} if listens for remove events.
+         * @param expire {@code True} if listens for expire events.
+         * @param fltrFactory Filter factory.
+         * @param ignite Ignite instance.
+         * @param cacheName Cache name.
+         */
+        EntryListenerFilter(
+            boolean create,
+            boolean update,
+            boolean rmv,
+            boolean expire,
+            Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory,
+            Ignite ignite,
+            @Nullable String cacheName) {
+            this.create = create;
+            this.update = update;
+            this.rmv = rmv;
+            this.expire = expire;
+            this.fltrFactory = fltrFactory;
+            this.ignite = ignite;
+            this.cacheName = cacheName;
+
+            if (fltrFactory != null)
+                fltr = fltrFactory.create();
+
+            cache = ignite.jcache(cacheName);
+
+            assert cache != null : cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K1, V1> entry) {
+            try {
+                EventType evtType;
+
+                if (entry.getValue() == null) {
+                    if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire.
+                        if (!expire)
+                            return false;
+
+                        evtType = EXPIRED;
+                    }
+                    else { // Remove.
+                        if (!rmv)
+                            return false;
+
+                        evtType = REMOVED;
+                    }
+                }
+                else {
+                    if (entry.getOldValue() != null) { // Update.
+                        if (!update)
+                            return false;
+
+                        evtType = UPDATED;
+                    }
+                    else { // Create.
+                        if (!create)
+                            return false;
+
+                        evtType = CREATED;
+                    }
+                }
+
+                if (cache == null) {
+                    cache = ignite.jcache(cacheName);
+
+                    assert cache != null : cacheName;
+                }
+
+                return fltr == null || fltr.evaluate(new org.apache.ignite.cache.CacheEntryEvent(cache, evtType, entry));
+            }
+            catch (CacheEntryListenerException e) {
+                LT.warn(ignite.log(), e, "Cache entry event filter error: " + e);
+
+                return false;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeBoolean(create);
+
+            out.writeBoolean(update);
+
+            out.writeBoolean(rmv);
+
+            out.writeBoolean(expire);
+
+            U.writeString(out, cacheName);
+
+            out.writeObject(fltrFactory);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            create = in.readBoolean();
+
+            update = in.readBoolean();
+
+            rmv = in.readBoolean();
+
+            expire = in.readBoolean();
+
+            cacheName = U.readString(in);
+
+            fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject();
+
+            if (fltrFactory != null)
+                fltr = fltrFactory.create();
+        }
+    }
+
+    /**
+     *
+     */
+    private class EntryListenerCallback implements
+        IgniteBiPredicate<UUID, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>>> {
+        /** */
+        private final IgniteCacheProxy<K, V> cache;
+
+        /** */
+        private final CacheEntryCreatedListener createLsnr;
+
+        /** */
+        private final CacheEntryUpdatedListener updateLsnr;
+
+        /** */
+        private final CacheEntryRemovedListener rmvLsnr;
+
+        /** */
+        private final CacheEntryExpiredListener expireLsnr;
+
+        /**
+         * @param cache Cache to be used as event source.
+         * @param lsnr Listener.
+         */
+        EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) {
+            this.cache = cache;
+
+            createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null;
+            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null;
+            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null;
+            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null;
+        }
+
+        /**
+         * @return {@code True} if listens for create event.
+         */
+        boolean create() {
+            return createLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for update event.
+         */
+        boolean update() {
+            return updateLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for remove event.
+         */
+        boolean remove() {
+            return rmvLsnr != null;
+        }
+
+        /**
+         * @return {@code True} if listens for expire event.
+         */
+        boolean expire() {
+            return expireLsnr != null;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(UUID uuid, Collection<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> entries) {
+            for (org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry entry : entries) {
+                try {
+                    if (entry.getValue() == null) { // Remove.
+                        if (((GridCacheContinuousQueryEntry)entry).expired()) { // Expire.
+                            assert expireLsnr != null;
+
+                            org.apache.ignite.cache.CacheEntryEvent evt0 =
+                                new org.apache.ignite.cache.CacheEntryEvent(cache, EXPIRED, entry);
+
+                            expireLsnr.onExpired(Collections.singleton(evt0));
+                        }
+                        else {
+                            assert rmvLsnr != null;
+
+                            org.apache.ignite.cache.CacheEntryEvent evt0 =
+                                new org.apache.ignite.cache.CacheEntryEvent(cache, REMOVED, entry);
+
+                            rmvLsnr.onRemoved(Collections.singleton(evt0));
+                        }
+                    }
+                    else if (entry.getOldValue() != null) { // Update.
+                        assert updateLsnr != null;
+
+                        org.apache.ignite.cache.CacheEntryEvent evt0 =
+                            new org.apache.ignite.cache.CacheEntryEvent(cache, UPDATED, entry);
+
+                        updateLsnr.onUpdated(Collections.singleton(evt0));
+                    }
+                    else { // Create.
+                        assert createLsnr != null;
+
+                        org.apache.ignite.cache.CacheEntryEvent evt0 =
+                            new org.apache.ignite.cache.CacheEntryEvent(cache, CREATED, entry);
+
+                        createLsnr.onCreated(Collections.singleton(evt0));
+                    }
+                }
+                catch (CacheEntryListenerException e) {
+                    LT.warn(log, e, "Cache entry listener error: " + e);
+                }
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
index b8b5998..d8dfc97 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/service/GridServiceProcessor.java
@@ -124,13 +124,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
             cfgQry.localCallback(new DeploymentListener());
 
-            cfgQry.execute(ctx.grid().forLocal(), true);
+            cfgQry.execute(ctx.grid().forLocal(), true, false);
 
             assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery();
 
             assignQry.localCallback(new AssignmentListener());
 
-            assignQry.execute(ctx.grid().forLocal(), true);
+            assignQry.execute(ctx.grid().forLocal(), true, false);
         }
         finally {
             if (ctx.deploy().enabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 949b5c6..6e3221a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -10,88 +10,197 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
 import org.gridgain.grid.cache.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
 
 import javax.cache.configuration.*;
 import javax.cache.event.*;
 
-import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
-import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
-import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static javax.cache.event.EventType.*;
 import static org.gridgain.grid.cache.GridCacheMode.*;
 
 /**
  *
  */
-public class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
-    @Override protected int gridCount() {
-        return 3;
-    }
+public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
+    /** */
+    private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts;
 
-    @Override protected GridCacheMode cacheMode() {
-        return PARTITIONED;
-    }
+    /** */
+    private static volatile CountDownLatch evtsLatch;
 
-    @Override protected GridCacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
+    /** */
+    private Integer lastKey = 0;
 
-    @Override protected GridCacheDistributionMode distributionMode() {
-        return PARTITIONED_ONLY;
-    }
+    /** */
+    private CacheEntryListenerConfiguration lsnrCfg;
 
-    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
-        return PRIMARY;
-    }
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
 
-    @Override
-    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setIncludeEventTypes(IgniteEventType.EVT_CACHE_OBJECT_PUT);
+        if (lsnrCfg != null)
+            cfg.addCacheEntryListenerConfiguration(lsnrCfg);
 
         return cfg;
     }
 
-    @Override
-    protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
-        GridCacheConfiguration ccfg = super.cacheConfiguration(gridName);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvents() throws Exception {
+        CacheEntryCreatedListener<Integer, Integer> createLsnr = new CacheEntryCreatedListener<Integer, Integer>() {
+            @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                    onEvent(evt);
+            }
+        };
+
+        CacheEntryUpdatedListener<Integer, Integer> updateLsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                    onEvent(evt);
+            }
+        };
+
+        CacheEntryRemovedListener<Integer, Integer> rmvLsnr = new CacheEntryRemovedListener<Integer, Integer>() {
+            @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                    onEvent(evt);
+            }
+        };
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i + 1_000_000, i);
+
+        cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.
+
+        for (Integer key : keys()) {
+            log.info("Check create event [key=" + key + ']');
+
+            checkEvents(cache, createLsnr, key, true, false, false);
+
+            log.info("Check update event [key=" + key + ']');
+
+            checkEvents(cache, updateLsnr, key, false, true, false);
+
+            log.info("Check remove event [key=" + key + ']');
+
+            checkEvents(cache, rmvLsnr, key, false, false, true);
+
+            log.info("Check create/update events [key=" + key + ']');
+
+            checkEvents(cache, new CreateUpdateListener(), key, true, true, false);
+
+            log.info("Check create/update/remove events [key=" + key + ']');
+
+            checkEvents(cache, new CreateUpdateRemoveListener(), key, true, true, true);
+        }
+
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
+                    return new CreateUpdateRemoveListener();
+                }
+            },
+            new TestFilterFactory(),
+            true,
+            false
+        );
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        log.info("Check filter.");
+
+        checkFilter(cache, vals);
+
+        cache.deregisterCacheEntryListener(lsnrCfg);
 
-        //ccfg.setBackups(1);
+        cache.putAll(vals);
 
-        return ccfg;
+        checkListenerOnStart(vals);
     }
 
-    public void testEvent() throws Exception {
-        Ignite ignite = ignite(0);
+    /**
+     * @param vals Values in cache.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception {
+        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
+                    return new CreateUpdateRemoveListener();
+                }
+            },
+            null,
+            true,
+            false
+        );
 
-        /*
-        ignite.events().remoteListen(new IgniteBiPredicate<UUID, IgniteEvent>() {
-            @Override public boolean apply(UUID uuid, IgniteEvent e) {
-                IgniteCacheEvent evt0 = (IgniteCacheEvent)e;
+        Ignite grid = startGrid(gridCount());
 
-                System.out.println("Event: " + uuid + " " + evt0.eventNode() + " " + e);
+        IgniteCache<Integer, Integer> cache = grid.jcache(null);
 
-                return false;
-            }
-        }, null, IgniteEventType.EVT_CACHE_OBJECT_PUT);
-        */
+        Integer key = Integer.MAX_VALUE;
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        log.info("Check create/update/remove events for listener in configuration [key=" + key + ']');
+
+        checkEvents(cache, lsnrCfg, key, true, true, true);
 
-        final CacheEntryCreatedListener<Integer, Integer> lsnr = new CacheEntryCreatedListener<Integer, Integer>() {
-            @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) throws CacheEntryListenerException {
-                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
-                    System.out.println("Event: " + evt.getEventType() + " " + evt.getKey() + " " + evt.getOldValue() + " " + evt.getValue());
+        stopGrid(gridCount());
+
+        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
+                    return new CreateUpdateRemoveListener();
                 }
-            }
-        };
+            },
+            new TestFilterFactory(),
+            true,
+            false
+        );
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration(
-            new Factory<CacheEntryListener>() {
-                @Override public CacheEntryListener create() {
+        grid = startGrid(gridCount());
+
+        cache = grid.jcache(null);
+
+        log.info("Check filter for listener in configuration.");
+
+        checkFilter(cache, vals);
+
+        stopGrid(gridCount());
+    }
+
+    /**
+     * @param cache Cache.
+     * @param lsnr Listener.
+     * @param key Key.
+     * @param create {@code True} if listens for create events.
+     * @param update {@code True} if listens for update events.
+     * @param rmv {@code True} if listens for remove events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(
+        final IgniteCache<Integer, Integer> cache,
+        final CacheEntryListener<Integer, Integer> lsnr,
+        Integer key,
+        boolean create,
+        boolean update,
+        boolean rmv) throws Exception {
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
                     return lsnr;
                 }
             },
@@ -102,9 +211,325 @@ public class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTes
 
         cache.registerCacheEntryListener(lsnrCfg);
 
-        ignite(1).cache(null).put(1, 1);
-        ignite(0).cache(null).put(1, 2);
+        checkEvents(cache, lsnrCfg, key, create, update, rmv);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param vals Values in cache.
+     * @throws Exception If failed.
+     */
+    private void checkFilter(IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception {
+        evts = new ArrayList<>();
+
+        final int expEvts = (vals.size() / 2) * 3; // Remove, create and update for half of modified entries.
+
+        evtsLatch = new CountDownLatch(expEvts);
+
+        cache.removeAll(vals.keySet());
+
+        cache.putAll(vals);
+
+        Map<Integer, Integer> newVals = new HashMap<>();
+
+        for (Integer key : vals.keySet())
+            newVals.put(key, -1);
+
+        cache.putAll(newVals);
+
+        evtsLatch.await(5000, TimeUnit.MILLISECONDS);
+
+        assertEquals(expEvts, evts.size());
+
+        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+
+        for (Integer key : vals.keySet()) {
+            if (key % 2 == 0) {
+                CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+
+                assertTrue(evt.getKey() % 2 == 0);
+                assertTrue(vals.keySet().contains(evt.getKey()));
+                assertEquals(REMOVED, evt.getEventType());
+                assertNull(evt.getValue());
+                assertEquals(vals.get(evt.getKey()), evt.getOldValue());
+
+                iter.remove();
+            }
+        }
+
+        for (Integer key : vals.keySet()) {
+            if (key % 2 == 0) {
+                CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+
+                assertTrue(evt.getKey() % 2 == 0);
+                assertTrue(vals.keySet().contains(evt.getKey()));
+                assertEquals(CREATED, evt.getEventType());
+                assertEquals(vals.get(evt.getKey()), evt.getValue());
+                assertNull(evt.getOldValue());
+
+                iter.remove();
+            }
+        }
+
+        for (Integer key : vals.keySet()) {
+            if (key % 2 == 0) {
+                CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+
+                assertTrue(evt.getKey() % 2 == 0);
+                assertTrue(vals.keySet().contains(evt.getKey()));
+                assertEquals(UPDATED, evt.getEventType());
+                assertEquals(-1, (int) evt.getValue());
+                assertEquals(vals.get(evt.getKey()), evt.getOldValue());
+
+                iter.remove();
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param lsnrCfg Listener configuration.
+     * @param key Key.
+     * @param create {@code True} if listens for create events.
+     * @param update {@code True} if listens for update events.
+     * @param rmv {@code True} if listens for remove events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(
+        final IgniteCache<Integer, Integer> cache,
+        final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+        Integer key,
+        boolean create,
+        boolean update,
+        boolean rmv) throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.registerCacheEntryListener(lsnrCfg);
+
+                return null;
+            }
+        }, IllegalArgumentException.class, null);
+
+        final int UPDATES = 10;
+
+        int expEvts = 0;
+
+        if (create)
+            expEvts += 2;
+
+        if (update)
+            expEvts += (UPDATES + 1);
+
+        if (rmv)
+            expEvts += 2;
+
+        evts = new ArrayList<>();
+
+        evtsLatch = new CountDownLatch(expEvts);
+
+        cache.put(key, 0);
+
+        for (int i = 0; i < UPDATES; i++)
+            cache.put(key, i + 1);
+
+        assertFalse(cache.putIfAbsent(key, -1));
+
+        assertFalse(cache.remove(key, -1));
+
+        assertTrue(cache.remove(key));
+
+        IgniteCache<Integer, Integer> cache1 = cache;
+
+        if (gridCount() > 1)
+            cache1 = jcache(1); // Do updates from another node.
+
+        cache1.put(key, 1);
+
+        cache1.put(key, 2);
+
+        assertTrue(cache1.remove(key));
+
+        evtsLatch.await(5000, TimeUnit.MILLISECONDS);
+
+        assertEquals(expEvts, evts.size());
+
+        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+
+        if (create)
+            checkEvent(iter, key, CREATED, 0, null);
+
+        if (update) {
+            for (int i = 0; i < UPDATES; i++)
+                checkEvent(iter, key, UPDATED, i + 1, i);
+        }
+
+        if (rmv)
+            checkEvent(iter, key, REMOVED, null, UPDATES);
+
+        if (create)
+            checkEvent(iter, key, CREATED, 1, null);
+
+        if (update)
+            checkEvent(iter, key, UPDATED, 2, 1);
+
+        if (rmv)
+            checkEvent(iter, key, REMOVED, null, 2);
+
+        assertEquals(0, evts.size());
+
+        log.info("Remove listener. ");
+
+        cache.deregisterCacheEntryListener(lsnrCfg);
+
+        cache.put(key, 1);
+
+        cache.put(key, 2);
+
+        assertTrue(cache.remove(key));
+
+        U.sleep(500); // Sleep some time to ensure listener was really removed.
+
+        assertEquals(0, evts.size());
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        cache.deregisterCacheEntryListener(lsnrCfg);
+    }
+
+    /**
+     * @param iter Received events iterator.
+     * @param expKey Expected key.
+     * @param expType Expected type.
+     * @param expVal Expected value.
+     * @param expOld Expected old value.
+     */
+    private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter,
+        Integer expKey,
+        EventType expType,
+        @Nullable Integer expVal,
+        @Nullable Integer expOld) {
+        assertTrue(iter.hasNext());
+
+        CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+
+        iter.remove();
+
+        assertTrue(evt.getSource() instanceof IgniteCacheProxy);
+
+        assertEquals(expKey, evt.getKey());
+
+        assertEquals(expType, evt.getEventType());
+
+        assertEquals(expVal, evt.getValue());
+
+        assertEquals(expOld, evt.getOldValue());
+
+        if (expOld == null)
+            assertFalse(evt.isOldValueAvailable());
+        else
+            assertTrue(evt.isOldValueAvailable());
+    }
+
+    /**
+     * @return Test keys.
+     * @throws Exception If failed.
+     */
+    protected Collection<Integer> keys() throws Exception {
+        GridCache<Integer, Object> cache = cache(0);
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        keys.add(primaryKeys(cache, 1, lastKey).get(0));
+
+        if (gridCount() > 1) {
+            keys.add(backupKeys(cache, 1, lastKey).get(0));
+
+            if (cache.configuration().getCacheMode() != REPLICATED)
+                keys.add(nearKeys(cache, 1, lastKey).get(0));
+        }
+
+        lastKey = Collections.max(keys) + 1;
+
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        evts = null;
+
+        evtsLatch = null;
+    }
+
+    /**
+     * @param evt Event.
+     */
+    private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+        //System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');
+
+        assert evt != null;
+        assert evt.getSource() != null : evt;
+        assert evt.getEventType() != null : evt;
+        assert evt.getKey() != null : evt;
+
+        evts.add(evt);
+
+        evtsLatch.countDown();
+    }
+
+    /**
+     *
+     */
+    static class TestFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventFilter<Integer, Integer> create() {
+            return new TestFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestFilter implements CacheEntryEventFilter<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+            assert evt != null;
+            assert evt.getSource() != null : evt;
+            assert evt.getEventType() != null : evt;
+            assert evt.getKey() != null : evt;
+
+            return evt.getKey() % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
+        CacheEntryUpdatedListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                onEvent(evt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                onEvent(evt);
+        }
+    }
 
-        Thread.sleep(2000);
+    /**
+     *
+     */
+    static class CreateUpdateRemoveListener extends CreateUpdateListener
+        implements CacheEntryRemovedListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                onEvent(evt);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f02315/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java
new file mode 100644
index 0000000..5c7ec68
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicLocalTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicLocalTest extends IgniteCacheEntryListenerAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return LOCAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}


Mime
View raw message