ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-2587 Fixed continuous query notification in offheap mode
Date Tue, 09 Feb 2016 12:48:24 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2587 d26cf06ea -> e22baba45


ignite-2587 Fixed continuous query notification in offheap mode


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e22baba4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e22baba4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e22baba4

Branch: refs/heads/ignite-2587
Commit: e22baba45eb1d67fb2f3941b903a72e4094e311a
Parents: d26cf06
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 9 15:47:10 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 9 15:47:10 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   2 +-
 .../continuous/GridContinuousProcessor.java     |  16 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 390 ++++++++++++-------
 .../junits/common/GridCommonAbstractTest.java   |   2 +-
 4 files changed, 255 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e22baba4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b6ba4ee..2e1b1f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1788,7 +1788,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                     key,
                     val,
                     old,
-                    isInternal() || !context().userCache(),
+                    internal,
                     partition(),
                     true,
                     false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e22baba4/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7c7e3e3..0218897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName())
!= null) {
-            Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
-                .context().topology().updateCounters();
+        if (hnd.isQuery()) {
+            GridCacheProcessor proc = ctx.cache();
 
-            req.addUpdateCounters(cntrs);
+            if (proc != null) {
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+
+                if (cache != null && !cache.isLocal()) {
+                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+
+                    req.addUpdateCounters(cntrs);
+                }
+            }
         }
 
         if (err != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e22baba4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 7a6cec3..7e021f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -82,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
  */
 public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest
{
     /** */
-    private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>>
evts;
+    private static volatile List<CacheEntryEvent<?, ?>> evts;
 
     /** */
     private static volatile CountDownLatch evtsLatch;
@@ -94,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     private Integer lastKey = 0;
 
     /** */
-    private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
+    private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;
+
+    /** */
+    private boolean useObjects;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -141,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testExceptionIgnored() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new ExceptionListener();
                 }
             },
@@ -152,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             false
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -170,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super
Integer>>() {
-                @Override public CacheEntryEventSerializableFilter<? super Integer, ?
super Integer> create() {
+            new Factory<CacheEntryEventSerializableFilter<Object, Object>>()
{
+                @Override public CacheEntryEventSerializableFilter<Object, Object>
create() {
                     return new ExceptionFilter();
                 }
             },
@@ -204,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testNoOldValue() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
@@ -215,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         try {
             for (Integer key : keys()) {
@@ -234,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
+    public void testSynchronousEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testSynchronousEvents();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSynchronousEvents() throws Exception {
-        final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener()
{
-            @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+        final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener()
{
+            @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts)
{
                 super.onRemoved(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
{
                 super.onCreated(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
                 super.onUpdated(evts);
 
                 awaitLatch();
@@ -264,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             }
         };
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return lsnr;
                 }
             },
@@ -275,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -390,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         final CyclicBarrier barrier = new CyclicBarrier(THREADS);
 
-        final IgniteCache<Integer, Integer> cache = jcache(0);
+        final IgniteCache<Object, Object> cache = jcache(0);
 
         GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>(
-                    new Factory<CacheEntryListener<Integer, Integer>>() {
-                        @Override public CacheEntryListener<Integer, Integer> create()
{
+                CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
+                    new Factory<CacheEntryListener<Object, Object>>() {
+                        @Override public CacheEntryListener<Object, Object> create()
{
                             return new CreateUpdateRemoveExpireListener();
                         }
                     },
@@ -453,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expEvts Expected events number.
      * @throws Exception If failed.
      */
-    private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer>
cache, int expEvts)
+    private void syncEvent(
+        Integer key,
+        Integer val,
+        IgniteCache<Object, Object> cache,
+        int expEvts)
         throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends
Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -478,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         });
 
         if (val != null)
-            cache.put(key, val);
+            cache.put(key(key), value(val));
         else
-            cache.remove(key);
+            cache.remove(key(key));
 
         done.set(true);
 
@@ -492,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     * @param key Integer key.
+     * @return Key instance.
+     */
+    private Object key(Integer key) {
+        assert key != null;
+
+        return useObjects ? new ListenerTestKey(key) : key;
+    }
+
+    /**
+     * @param val Integer value.
+     * @return Value instance.
+     */
+    private Object value(Integer val) {
+        if (val == null)
+            return null;
+
+        return useObjects ? new ListenerTestValue(val) : val;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testEvents();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testEvents() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
-        Map<Integer, Integer> vals = new HashMap<>();
+        Map<Object, Object> vals = new HashMap<>();
 
         for (int i = 0; i < 100; i++)
-            vals.put(i + 2_000_000, i);
+            vals.put(key(i + 2_000_000), value(i));
 
         cache.putAll(vals); // Put some data in cache to make sure events are not generated
for existing entries.
 
@@ -530,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true,
true, true, true);
         }
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             new TestFilterFactory(),
             true,
@@ -563,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception
{
+    private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             null,
@@ -576,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             Integer key = Integer.MAX_VALUE;
 
@@ -600,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             log.info("Check filter for listener in configuration.");
 
@@ -625,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory,
+        final IgniteCache<Object, Object> cache,
+        final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
         Integer key,
         boolean create,
         boolean update,
         boolean rmv,
         boolean expire) throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             lsnrFactory,
             null,
             true,
@@ -654,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param vals Values in cache.
      * @throws Exception If failed.
      */
-    private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer,
Integer> vals) throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends
Integer, ? extends Integer>>());
+    private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object,
Object> vals) throws Exception {
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for
half of modified entries.
 
@@ -665,16 +711,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.putAll(vals);
 
-        final Map<Integer, Integer> newVals = new HashMap<>();
+        final Map<Object, Object> newVals = new HashMap<>();
 
-        for (Integer key : vals.keySet())
-            newVals.put(key, -1);
+        for (Object key : vals.keySet())
+            newVals.put(key, value(-1));
 
         cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);
 
-        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                for (Integer key : newVals.keySet()) {
+                for (Object key : newVals.keySet()) {
                     if (primaryCache(key, cache.getName()).get(key) != null)
                         return false;
                 }
@@ -683,19 +729,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             }
         }, 5000);
 
-        assertTrue(wait);
-
         evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
-        Set<Integer> rmvd = new HashSet<>();
-        Set<Integer> created = new HashSet<>();
-        Set<Integer> updated = new HashSet<>();
-        Set<Integer> expired = new HashSet<>();
+        Set<Object> rmvd = new HashSet<>();
+        Set<Object> created = new HashSet<>();
+        Set<Object> updated = new HashSet<>();
+        Set<Object> expired = new HashSet<>();
+
+        for (CacheEntryEvent<?, ?> evt : evts) {
+            Integer key;
+
+            if (useObjects)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
 
-        for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
-            assertTrue(evt.getKey() % 2 == 0);
+            assertTrue(key % 2 == 0);
 
             assertTrue(vals.keySet().contains(evt.getKey()));
 
@@ -721,7 +772,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     break;
 
                 case UPDATED:
-                    assertEquals(-1, (int)evt.getValue());
+                    assertEquals(value(-1), evt.getValue());
 
                     assertEquals(vals.get(evt.getKey()), evt.getOldValue());
 
@@ -736,7 +787,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                 case EXPIRED:
                     assertNull(evt.getValue());
 
-                    assertEquals(-1, (int)evt.getOldValue());
+                    assertEquals(value(-1), evt.getOldValue());
 
                     assertTrue(rmvd.contains(evt.getKey()));
 
@@ -771,8 +822,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+        final IgniteCache<Object, Object> cache,
+        final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
         Integer key,
         boolean create,
         boolean update,
@@ -803,64 +854,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         if (expire)
             expEvts += 2;
 
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends
Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
-        cache.put(key, 0);
+        cache.put(key(key), value(0));
 
         for (int i = 0; i < UPDATES; i++) {
             if (i % 2 == 0)
-                cache.put(key, i + 1);
+                cache.put(key(key), value(i + 1));
             else
-                cache.invoke(key, new EntrySetValueProcessor(i + 1));
+                cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
         }
 
         // Invoke processor does not update value, should not trigger event.
-        assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor()));
+        assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));
 
-        assertFalse(cache.putIfAbsent(key, -1));
+        assertFalse(cache.putIfAbsent(key(key), value(-1)));
 
-        assertFalse(cache.remove(key, -1));
+        assertFalse(cache.remove(key(key), value(-1)));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache =
+        IgniteCache<Object, Object> expirePlcCache =
             cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache.put(key, 10);
+        expirePlcCache.put(key(key), value(10));
 
         U.sleep(700);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event
if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire
event if eager ttl is disabled.
 
-        IgniteCache<Integer, Integer> cache1 = cache;
+        IgniteCache<Object, Object> cache1 = cache;
 
         if (gridCount() > 1)
             cache1 = jcache(1); // Do updates from another node.
 
-        cache1.put(key, 1);
+        cache1.put(key(key), value(1));
 
-        cache1.put(key, 2);
+        cache1.put(key(key), value(2));
 
-        assertTrue(cache1.remove(key));
+        assertTrue(cache1.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache1 =
+        IgniteCache<Object, Object> expirePlcCache1 =
             cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache1.put(key, 20);
+        expirePlcCache1.put(key(key), value(20));
 
         U.sleep(200);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event
if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire
event if eager ttl is disabled.
 
         evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
-        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter
= evts.iterator();
+        Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();
 
         if (create)
             checkEvent(iter, key, CREATED, 0, null);
@@ -900,11 +951,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.deregisterCacheEntryListener(lsnrCfg);
 
-        cache.put(key, 1);
+        cache.put(key(key), value(1));
 
-        cache.put(key, 2);
+        cache.put(key(key), value(2));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
         U.sleep(500); // Sleep some time to ensure listener was really removed.
 
@@ -922,26 +973,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expVal Expected value.
      * @param expOld Expected old value.
      */
-    private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>>
iter,
+    private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
         Integer expKey,
         EventType expType,
         @Nullable Integer expVal,
         @Nullable Integer expOld) {
         assertTrue(iter.hasNext());
 
-        CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+        CacheEntryEvent<?, ?> evt = iter.next();
 
         iter.remove();
 
         assertTrue(evt.getSource() instanceof IgniteCacheProxy);
 
-        assertEquals(expKey, evt.getKey());
+        assertEquals(key(expKey), evt.getKey());
 
         assertEquals(expType, evt.getEventType());
 
-        assertEquals(expVal, evt.getValue());
+        assertEquals(value(expVal), evt.getValue());
 
-        assertEquals(expOld, evt.getOldValue());
+        assertEquals(value(expOld), evt.getOldValue());
 
         if (expOld == null)
             assertFalse(evt.isOldValueAvailable());
@@ -991,7 +1042,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @param evt Event.
      */
-    private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer>
evt) {
+    private static void onEvent(CacheEntryEvent<?, ?> evt) {
         // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName()
+ ']');
 
         assertNotNull(evt);
@@ -1007,9 +1058,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateRemoveExpireListener();
         }
     }
@@ -1017,9 +1068,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new NoOpCreateUpdateListener();
         }
     }
@@ -1027,9 +1078,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateListener();
         }
     }
@@ -1037,9 +1088,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class CreateListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateListener();
         }
     }
@@ -1047,9 +1098,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new RemoveListener();
         }
     }
@@ -1057,9 +1108,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new UpdateListener();
         }
     }
@@ -1067,9 +1118,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer,
Integer>> {
+    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new ExpireListener();
         }
     }
@@ -1077,9 +1128,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer,
Integer>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object,
Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Integer, Integer> create()
{
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create()
{
             return new TestFilter();
         }
     }
@@ -1087,10 +1138,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class CreateListener implements CacheEntryCreatedListener<Integer,
Integer> {
+    private static class CreateListener implements CacheEntryCreatedListener<Object, Object>
{
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1098,10 +1149,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListener implements CacheEntryUpdatedListener<Integer,
Integer> {
+    private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object>
{
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1109,10 +1160,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListener implements CacheEntryRemovedListener<Integer,
Integer> {
+    private static class RemoveListener implements CacheEntryRemovedListener<Object, Object>
{
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1120,10 +1171,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListener implements CacheEntryExpiredListener<Integer,
Integer> {
+    private static class ExpireListener implements CacheEntryExpiredListener<Object, Object>
{
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1131,32 +1182,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Integer,
Integer> {
+    private static class TestFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
             assert evt.getSource() != null : evt;
             assert evt.getEventType() != null : evt;
             assert evt.getKey() != null : evt;
 
-            return evt.getKey() % 2 == 0;
+            Integer key;
+
+            if (evt.getKey() instanceof ListenerTestKey)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
+
+            return key % 2 == 0;
         }
     }
 
     /**
      *
      */
-    private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer,
Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class CreateUpdateListener implements CacheEntryCreatedListener<Object,
Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1164,11 +1222,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer,
Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object,
Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
{
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1177,8 +1235,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
{
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1191,16 +1249,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
      *
      */
     private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer,
Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object,
Object> {
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1208,9 +1266,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer,
Integer> {
+    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             throw new RuntimeException("Test filter error.");
         }
     }
@@ -1219,24 +1277,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
      *
      */
     private static class ExceptionListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer,
Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object,
Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
{
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts)
{
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts)
{
             error();
         }
 
@@ -1251,10 +1309,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer,
String> {
+    protected static class EntryToStringProcessor implements EntryProcessor<Object, Object,
String> {
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object...
arguments)
-            throws EntryProcessorException {
+        @Override public String process(MutableEntry<Object, Object> e, Object... args)
{
+            if (e.getValue() instanceof ListenerTestValue)
+                return String.valueOf(((ListenerTestValue)e.getValue()).val1);
+
             return String.valueOf(e.getValue());
         }
 
@@ -1267,19 +1327,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
     /**
      *
      */
-    protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer,
String> {
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object,
String> {
         /** */
-        private Integer val;
+        private Object val;
 
         /**
          * @param val Value to set.
          */
-        public EntrySetValueProcessor(Integer val) {
+        public EntrySetValueProcessor(Object val) {
             this.val = val;
         }
 
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object...
arguments)
+        @Override public String process(MutableEntry<Object, Object> e, Object... args)
             throws EntryProcessorException {
             e.setValue(val);
 
@@ -1353,6 +1413,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
         @Override public int hashCode() {
             return key.hashCode();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestKey.class, this);
+        }
     }
 
     /**
@@ -1372,5 +1437,32 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends
IgniteCacheAb
             this.val1 = val;
             this.val2 = String.valueOf(val);
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ListenerTestValue that = (ListenerTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestValue.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e22baba4/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 949290e..4fcc1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -941,7 +941,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
      * @param cacheName Cache name.
      * @return Near cache for key.
      */
-    protected IgniteCache<Integer, Integer> primaryCache(Integer key, String cacheName)
{
+    protected <K, V> IgniteCache<K, V> primaryCache(Object key, String cacheName)
{
         return primaryNode(key, cacheName).cache(cacheName);
     }
 


Mime
View raw message