ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [05/17] ignite git commit: ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.
Date Thu, 11 Feb 2016 12:39:35 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 27edb0c..e6bfd87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,6 +57,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.CREATED;
 import static javax.cache.event.EventType.EXPIRED;
 import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -79,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
  */
 public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
     /** */
-    private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts;
+    private static volatile List<CacheEntryEvent<?, ?>> evts;
 
     /** */
     private static volatile CountDownLatch evtsLatch;
@@ -91,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     private Integer lastKey = 0;
 
     /** */
-    private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
+    private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;
+
+    /** */
+    private boolean useObjects;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -103,9 +109,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cfg.setEagerTtl(eagerTtl());
 
+        cfg.setMemoryMode(memoryMode());
+
         return cfg;
     }
 
+    /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -129,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testExceptionIgnored() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new ExceptionListener();
                 }
             },
@@ -140,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             false
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -158,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super Integer>>() {
-                @Override public CacheEntryEventSerializableFilter<? super Integer, ? super Integer> create() {
+            new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
+                @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
                     return new ExceptionFilter();
                 }
             },
@@ -192,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testNoOldValue() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
@@ -203,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         try {
             for (Integer key : keys()) {
@@ -222,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
+    public void testSynchronousEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testSynchronousEvents();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSynchronousEvents() throws Exception {
-        final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() {
-            @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() {
+            @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onRemoved(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onCreated(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onUpdated(evts);
 
                 awaitLatch();
@@ -252,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             }
         };
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return lsnr;
                 }
             },
@@ -263,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -299,7 +323,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                 if (!eagerTtl()) {
                     U.sleep(1100);
 
-                    assertNull(primaryCache(key, cache.getName()).get(key));
+                    assertNull(primaryCache(key, cache.getName()).get(key(key)));
 
                     evtsLatch.await(5000, MILLISECONDS);
 
@@ -378,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         final CyclicBarrier barrier = new CyclicBarrier(THREADS);
 
-        final IgniteCache<Integer, Integer> cache = jcache(0);
+        final IgniteCache<Object, Object> cache = jcache(0);
 
         GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>(
-                    new Factory<CacheEntryListener<Integer, Integer>>() {
-                        @Override public CacheEntryListener<Integer, Integer> create() {
+                CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
+                    new Factory<CacheEntryListener<Object, Object>>() {
+                        @Override public CacheEntryListener<Object, Object> create() {
                             return new CreateUpdateRemoveExpireListener();
                         }
                     },
@@ -441,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expEvts Expected events number.
      * @throws Exception If failed.
      */
-    private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts)
+    private void syncEvent(
+        Integer key,
+        Integer val,
+        IgniteCache<Object, Object> cache,
+        int expEvts)
         throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -466,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         });
 
         if (val != null)
-            cache.put(key, val);
+            cache.put(key(key), value(val));
         else
-            cache.remove(key);
+            cache.remove(key(key));
 
         done.set(true);
 
@@ -480,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     * @param key Integer key.
+     * @return Key instance.
+     */
+    private Object key(Integer key) {
+        assert key != null;
+
+        return useObjects ? new ListenerTestKey(key) : key;
+    }
+
+    /**
+     * @param val Integer value.
+     * @return Value instance.
+     */
+    private Object value(Integer val) {
+        if (val == null)
+            return null;
+
+        return useObjects ? new ListenerTestValue(val) : val;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testEvents();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testEvents() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
-        Map<Integer, Integer> vals = new HashMap<>();
+        Map<Object, Object> vals = new HashMap<>();
 
         for (int i = 0; i < 100; i++)
-            vals.put(i + 2_000_000, i);
+            vals.put(key(i + 2_000_000), value(i));
 
         cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.
 
@@ -518,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true);
         }
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             new TestFilterFactory(),
             true,
@@ -551,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception {
+    private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             null,
@@ -564,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             Integer key = Integer.MAX_VALUE;
 
@@ -588,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             log.info("Check filter for listener in configuration.");
 
@@ -613,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory,
+        final IgniteCache<Object, Object> cache,
+        final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
         Integer key,
         boolean create,
         boolean update,
         boolean rmv,
         boolean expire) throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             lsnrFactory,
             null,
             true,
@@ -642,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param vals Values in cache.
      * @throws Exception If failed.
      */
-    private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+    private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception {
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries.
 
@@ -653,16 +711,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.putAll(vals);
 
-        final Map<Integer, Integer> newVals = new HashMap<>();
+        final Map<Object, Object> newVals = new HashMap<>();
 
-        for (Integer key : vals.keySet())
-            newVals.put(key, -1);
+        for (Object key : vals.keySet())
+            newVals.put(key, value(-1));
 
         cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);
 
+        U.sleep(1000);
+
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                for (Integer key : newVals.keySet()) {
+                for (Object key : newVals.keySet()) {
                     if (primaryCache(key, cache.getName()).get(key) != null)
                         return false;
                 }
@@ -675,13 +735,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         assertEquals(expEvts, evts.size());
 
-        Set<Integer> rmvd = new HashSet<>();
-        Set<Integer> created = new HashSet<>();
-        Set<Integer> updated = new HashSet<>();
-        Set<Integer> expired = new HashSet<>();
+        Set<Object> rmvd = new HashSet<>();
+        Set<Object> created = new HashSet<>();
+        Set<Object> updated = new HashSet<>();
+        Set<Object> expired = new HashSet<>();
+
+        for (CacheEntryEvent<?, ?> evt : evts) {
+            Integer key;
+
+            if (useObjects)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
 
-        for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
-            assertTrue(evt.getKey() % 2 == 0);
+            assertTrue(key % 2 == 0);
 
             assertTrue(vals.keySet().contains(evt.getKey()));
 
@@ -707,7 +774,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     break;
 
                 case UPDATED:
-                    assertEquals(-1, (int)evt.getValue());
+                    assertEquals(value(-1), evt.getValue());
 
                     assertEquals(vals.get(evt.getKey()), evt.getOldValue());
 
@@ -722,7 +789,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                 case EXPIRED:
                     assertNull(evt.getValue());
 
-                    assertEquals(-1, (int)evt.getOldValue());
+                    assertEquals(value(-1), evt.getOldValue());
 
                     assertTrue(rmvd.contains(evt.getKey()));
 
@@ -757,8 +824,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+        final IgniteCache<Object, Object> cache,
+        final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
         Integer key,
         boolean create,
         boolean update,
@@ -789,64 +856,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         if (expire)
             expEvts += 2;
 
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
-        cache.put(key, 0);
+        cache.put(key(key), value(0));
 
         for (int i = 0; i < UPDATES; i++) {
             if (i % 2 == 0)
-                cache.put(key, i + 1);
+                cache.put(key(key), value(i + 1));
             else
-                cache.invoke(key, new EntrySetValueProcessor(i + 1));
+                cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
         }
 
         // Invoke processor does not update value, should not trigger event.
-        assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor()));
+        assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));
 
-        assertFalse(cache.putIfAbsent(key, -1));
+        assertFalse(cache.putIfAbsent(key(key), value(-1)));
 
-        assertFalse(cache.remove(key, -1));
+        assertFalse(cache.remove(key(key), value(-1)));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache =
+        IgniteCache<Object, Object> expirePlcCache =
             cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache.put(key, 10);
+        expirePlcCache.put(key(key), value(10));
 
         U.sleep(700);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
 
-        IgniteCache<Integer, Integer> cache1 = cache;
+        IgniteCache<Object, Object> cache1 = cache;
 
         if (gridCount() > 1)
             cache1 = jcache(1); // Do updates from another node.
 
-        cache1.put(key, 1);
+        cache1.put(key(key), value(1));
 
-        cache1.put(key, 2);
+        cache1.put(key(key), value(2));
 
-        assertTrue(cache1.remove(key));
+        assertTrue(cache1.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache1 =
+        IgniteCache<Object, Object> expirePlcCache1 =
             cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache1.put(key, 20);
+        expirePlcCache1.put(key(key), value(20));
 
         U.sleep(200);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
 
         evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
-        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+        Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();
 
         if (create)
             checkEvent(iter, key, CREATED, 0, null);
@@ -886,11 +953,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.deregisterCacheEntryListener(lsnrCfg);
 
-        cache.put(key, 1);
+        cache.put(key(key), value(1));
 
-        cache.put(key, 2);
+        cache.put(key(key), value(2));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
         U.sleep(500); // Sleep some time to ensure listener was really removed.
 
@@ -908,26 +975,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expVal Expected value.
      * @param expOld Expected old value.
      */
-    private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter,
+    private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
         Integer expKey,
         EventType expType,
         @Nullable Integer expVal,
         @Nullable Integer expOld) {
         assertTrue(iter.hasNext());
 
-        CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+        CacheEntryEvent<?, ?> evt = iter.next();
 
         iter.remove();
 
         assertTrue(evt.getSource() instanceof IgniteCacheProxy);
 
-        assertEquals(expKey, evt.getKey());
+        assertEquals(key(expKey), evt.getKey());
 
         assertEquals(expType, evt.getEventType());
 
-        assertEquals(expVal, evt.getValue());
+        assertEquals(value(expVal), evt.getValue());
 
-        assertEquals(expOld, evt.getOldValue());
+        assertEquals(value(expOld), evt.getOldValue());
 
         if (expOld == null)
             assertFalse(evt.isOldValueAvailable());
@@ -977,7 +1044,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @param evt Event.
      */
-    private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+    private static void onEvent(CacheEntryEvent<?, ?> evt) {
         // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');
 
         assertNotNull(evt);
@@ -993,9 +1060,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateRemoveExpireListener();
         }
     }
@@ -1003,9 +1070,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new NoOpCreateUpdateListener();
         }
     }
@@ -1013,9 +1080,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateListener();
         }
     }
@@ -1023,9 +1090,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateListener();
         }
     }
@@ -1033,9 +1100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new RemoveListener();
         }
     }
@@ -1043,9 +1110,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new UpdateListener();
         }
     }
@@ -1053,9 +1120,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new ExpireListener();
         }
     }
@@ -1063,9 +1130,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer, Integer>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Integer, Integer> create() {
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
             return new TestFilter();
         }
     }
@@ -1073,10 +1140,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> {
+    private static class CreateListener implements CacheEntryCreatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1084,10 +1151,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> {
+    private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1095,10 +1162,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> {
+    private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1106,10 +1173,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> {
+    private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1117,32 +1184,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+    private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
             assert evt.getSource() != null : evt;
             assert evt.getEventType() != null : evt;
             assert evt.getKey() != null : evt;
 
-            return evt.getKey() % 2 == 0;
+            Integer key;
+
+            if (evt.getKey() instanceof ListenerTestKey)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
+
+            return key % 2 == 0;
         }
     }
 
     /**
      *
      */
-    private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1150,11 +1224,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1163,8 +1237,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1177,16 +1251,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      *
      */
     private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1194,9 +1268,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             throw new RuntimeException("Test filter error.");
         }
     }
@@ -1205,24 +1279,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      *
      */
     private static class ExceptionListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
@@ -1237,10 +1311,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer, String> {
+    protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> {
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
-            throws EntryProcessorException {
+        @Override public String process(MutableEntry<Object, Object> e, Object... args) {
+            if (e.getValue() instanceof ListenerTestValue)
+                return String.valueOf(((ListenerTestValue)e.getValue()).val1);
+
             return String.valueOf(e.getValue());
         }
 
@@ -1253,19 +1329,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer, String> {
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> {
         /** */
-        private Integer val;
+        private Object val;
 
         /**
          * @param val Value to set.
          */
-        public EntrySetValueProcessor(Integer val) {
+        public EntrySetValueProcessor(Object val) {
             this.val = val;
         }
 
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
+        @Override public String process(MutableEntry<Object, Object> e, Object... args)
             throws EntryProcessorException {
             e.setValue(val);
 
@@ -1307,4 +1383,88 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    static class ListenerTestKey implements Serializable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public ListenerTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ListenerTestKey that = (ListenerTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ListenerTestValue implements Serializable {
+        /** */
+        private final Integer val1;
+
+        /** */
+        private final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public ListenerTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ListenerTestValue that = (ListenerTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestValue.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..69efb84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapTieredTest extends IgniteCacheEntryListenerAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..23b1bc0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapValuesTest extends IgniteCacheEntryListenerAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
new file mode 100644
index 0000000..d552195
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapTieredTest extends IgniteCacheEntryListenerTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
new file mode 100644
index 0000000..32555c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapValuesTest extends IgniteCacheEntryListenerTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index a9e43d4..41725e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -48,6 +48,7 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
         return null;
     }
 
+    /** {@inheritDoc} */
     @Override public void testEvents(){
         fail("https://issues.apache.org/jira/browse/IGNITE-1600");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1c65f9b..a42f056 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -97,6 +98,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -142,6 +144,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         ccfg.setBackups(backups);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setNearConfiguration(nearCacheConfiguration());
+        ccfg.setMemoryMode(memoryMode());
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -151,6 +154,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
+    /**
      * @return Near cache configuration.
      */
     protected NearCacheConfiguration nearCacheConfiguration() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
new file mode 100644
index 0000000..cc8590d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest
+    extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
new file mode 100644
index 0000000..cae06c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxOffheapTieredTest extends CacheContinuousQueryFailoverTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..d9b2091
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 10;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
+                new ArrayBlockingQueue<>(10_000);
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                    for (CacheEntryEvent<?, ?> evt : evts) {
+                        // System.out.println("Event: " + evt);
+
+                        evtsQueue.add(evt);
+                    }
+                }
+            });
+
+            QueryCursor<?> cur = cache.query(qry);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    if (i % 100 == 0)
+                        log.info("Iteration: " + i);
+
+                    randomUpdate(rnd, evtsQueue, expData, cache);
+                }
+            }
+            finally {
+                cur.close();
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueue Events queue.
+     * @param expData Expected cache data.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        ConcurrentMap<Object, Object> expData,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+        switch (op) {
+            case 0: {
+                cache.put(key, newVal);
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 1: {
+                cache.getAndPut(key, newVal);
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 2: {
+                cache.remove(key);
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 3: {
+                cache.getAndRemove(key);
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 4: {
+                cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 5: {
+                cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 6: {
+                cache.putIfAbsent(key, newVal);
+
+                if (oldVal == null) {
+                    waitEvent(evtsQueue, key, newVal, null);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 7: {
+                cache.getAndPutIfAbsent(key, newVal);
+
+                if (oldVal == null) {
+                    waitEvent(evtsQueue, key, newVal, null);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 8: {
+                cache.replace(key, newVal);
+
+                if (oldVal != null) {
+                    waitEvent(evtsQueue, key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 9: {
+                cache.getAndReplace(key, newVal);
+
+                if (oldVal != null) {
+                    waitEvent(evtsQueue, key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 10: {
+                if (oldVal != null) {
+                    Object replaceVal = value(rnd);
+
+                    boolean success = replaceVal.equals(oldVal);
+
+                    if (success) {
+                        cache.replace(key, replaceVal, newVal);
+
+                        waitEvent(evtsQueue, key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else {
+                        cache.replace(key, replaceVal, newVal);
+
+                        checkNoEvent(evtsQueue);
+                    }
+                }
+                else {
+                    cache.replace(key, value(rnd), newVal);
+
+                    checkNoEvent(evtsQueue);
+                }
+
+                break;
+            }
+
+            default:
+                fail();
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        Object key, Object val, Object oldVal) throws Exception {
+        if (val == null && oldVal == null) {
+            checkNoEvent(evtsQueue);
+
+            return;
+        }
+
+        CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+        assertNotNull("Failed to wait for event [key=" + key +
+            ", val=" + val +
+            ", oldVal=" + oldVal + ']', evt);
+        assertEquals(key, evt.getKey());
+        assertEquals(val, evt.getValue());
+        assertEquals(oldVal, evt.getOldValue());
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception {
+        CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+        assertNull(evt);
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestKey implements Serializable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestValue implements Serializable {
+        /** */
+        private final Integer val1;
+
+        /** */
+        private final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+    /**
+     *
+     */
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        public EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5abb98d..dbe282e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -73,9 +74,9 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -117,6 +118,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             cacheCfg.setReadThrough(true);
             cacheCfg.setWriteThrough(true);
             cacheCfg.setLoadPreviousValue(true);
+            cacheCfg.setMemoryMode(memoryMode());
 
             cfg.setCacheConfiguration(cacheCfg);
         }
@@ -135,6 +137,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     }
 
     /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
+    /**
      * @return Peer class loading enabled flag.
      */
     protected boolean peerClassLoadingEnabled() {
@@ -393,8 +402,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
         });
 
-        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
-            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> qryCur2 = cache1.query(qry2);
+             QueryCursor<Cache.Entry<Integer, Integer>> qryCur1 = cache.query(qry1)) {
             for (int i = 0; i < gridCount(); i++) {
                 IgniteCache<Object, Object> cache0 = grid(i).cache(null);
 
@@ -448,7 +457,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     }
                 });
 
-                QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry);
+                QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
 
                 for (int key = 0; key < keyCnt; key++)
                     cache.put(key, key);
@@ -461,7 +470,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     }, 2000L);
                 }
                 finally {
-                    query.close();
+                    qryCur.close();
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..d6948e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapTieredTest extends GridCacheContinuousQueryAtomicSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..4002435
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapValuesTest extends GridCacheContinuousQueryAtomicSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
new file mode 100644
index 0000000..bcba7b6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapTieredTest extends GridCacheContinuousQueryTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}


Mime
View raw message