ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-3004 Added async callback tests.
Date Thu, 21 Apr 2016 14:44:30 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3004 13db6642c -> fab4fdb3e


IGNITE-3004 Added async callback tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fab4fdb3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fab4fdb3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fab4fdb3

Branch: refs/heads/ignite-3004
Commit: fab4fdb3e539e7644833b2ff517b14fc206339e6
Parents: 13db664
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Apr 21 17:44:17 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Apr 21 17:44:17 2016 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryVariationsTest.java     | 406 +++++++++++--------
 .../IgniteConfigVariationsAbstractTest.java     |   2 +-
 .../IgniteContinuousQueryVarSuite.java          |  47 +++
 .../IgniteBinaryCacheQueryTestSuite4.java       |   4 +-
 .../IgniteCacheQuerySelfTestSuite5.java         |  41 --
 5 files changed, 283 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fab4fdb3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
index f90a300..2cf3484 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
@@ -23,9 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -48,20 +46,23 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -78,7 +79,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
  */
 public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariationsAbstractTest
{
     /** */
-    private static final int ITERATION_CNT = 50;
+    private static final int ITERATION_CNT = 20;
 
     /** */
     private static final int KEYS = 50;
@@ -89,52 +90,83 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperationJCacheApi() throws Exception {
-        testRandomSingleOperation(true, false, false);
+    public void testRandomOperationJCacheApiKeepBinary() throws Exception {
+        testRandomOperation(true, false, false, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperationJCacheApiWithFilter() throws Exception {
-        testRandomSingleOperation(true, false, true);
+    public void testRandomOperationJCacheApiAsyncCallback() throws Exception {
+        testRandomOperation(true, false, false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperationJCacheApiSync() throws Exception {
-        testRandomSingleOperation(true, true, false);
+    public void testRandomOperationJCacheApiWithFilter() throws Exception {
+        testRandomOperation(true, false, true, false, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperationJCacheApiSyncWithFilter() throws Exception {
-        testRandomSingleOperation(true, true, true);
+    public void testRandomOperationJCacheApiWithFilterAsyncCallback() throws Exception {
+        testRandomOperation(true, false, true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperation() throws Exception {
-        testRandomSingleOperation(true, true, false);
+    public void testRandomOperationJCacheApiSyncWithFilter() throws Exception {
+        testRandomOperation(true, true, true, false, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testRandomSingleOperationWithFilter() throws Exception {
-        testRandomSingleOperation(true, true, true);
+    public void testRandomOperation() throws Exception {
+        testRandomOperation(true, true, false, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomOperationWithAsyncCallback() throws Exception {
+        testRandomOperation(true, true, false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomOperationWithFilter() throws Exception {
+        testRandomOperation(true, true, true, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomOperationWithFilterWithKeepBinary() throws Exception {
+        testRandomOperation(true, true, true, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomOperationWithFilterAsyncCallback() throws Exception {
+        testRandomOperation(true, true, true, true, false);
     }
 
     /**
      * @param jcacheApi Use JCache API.
      * @param syncNtf Use sync notification.
      * @param withFilter Use filter.
+     * @param asyncCallback Filter is annotated IgniteAsyncCallback
+     * @param keepBinary Keep binary.
      * @throws Exception If failed.
      */
-    private void testRandomSingleOperation(final boolean jcacheApi, final boolean syncNtf,
final boolean withFilter)
+    private void testRandomOperation(final boolean jcacheApi, final boolean syncNtf, final
boolean withFilter,
+        final boolean asyncCallback, final boolean keepBinary)
         throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
@@ -154,13 +186,17 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                 for (int idx = 0; idx < G.allGrids().size(); idx++) {
                     final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new
ArrayBlockingQueue<>(50_000);
 
-                    final CacheEntryUpdatedListener<Object, Object> lsnr =
-                        new LocalNonSerialiseListener() {
-                            @Override protected void onEvents(Iterable<CacheEntryEvent<?,
?>> evts) {
-                                for (CacheEntryEvent<?, ?> evt : evts)
-                                    evtsQueue.add(evt);
-                            }
-                        };
+                    CI1<Iterable<CacheEntryEvent<?, ?>>> clsr = new CI1<Iterable<CacheEntryEvent<?,
?>>>() {
+                        @Override public void apply(Iterable<CacheEntryEvent<?, ?>>
evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    };
+
+                    final CacheEntryUpdatedListener<Object, Object> lsnr = asyncCallback
?
+                        new AsyncLocalNonSerializableListener(clsr): new LocalNonSerializableListener(clsr);
+
+                    IgniteCache<Object, Object> jcache = keepBinary ? jcache(idx).withKeepBinary()
: jcache(idx);
 
                     if (jcacheApi) {
                         MutableCacheEntryListenerConfiguration<Object, Object> lsnrCfg
=
@@ -170,14 +206,20 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                                         return lsnr;
                                     }
                                 },
-                                withFilter ? FactoryBuilder.factoryOf(SerializableFilter.class)
: null,
+                                withFilter ?
+                                    FactoryBuilder.factoryOf(
+                                        asyncCallback ? new AsyncSerializableFilter(keepBinary)
+                                            : new SerializableFilter(keepBinary))
+                                    : null,
                                 true,
                                 syncNtf
                             );
 
-                        jcache(idx).registerCacheEntryListener(lsnrCfg);
+                        jcache.registerCacheEntryListener(lsnrCfg);
 
                         lsnrCfgs.add(lsnrCfg);
+
+                        evtsQueues.add(evtsQueue);
                     }
                     else {
                         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
@@ -185,9 +227,12 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                         qry.setLocalListener(lsnr);
 
                         qry.setRemoteFilterFactory(withFilter ?
-                            FactoryBuilder.factoryOf(SerializableFilter.class) : null);
+                            FactoryBuilder.factoryOf(
+                                asyncCallback ? new AsyncSerializableFilter(keepBinary)
+                                    : new SerializableFilter(keepBinary))
+                            : null);
 
-                        curs.add(jcache(idx).query(qry));
+                        curs.add(jcache.query(qry));
 
                         evtsQueues.add(evtsQueue);
                     }
@@ -197,13 +242,22 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
 
                 try {
                     for (int i = 0; i < ITERATION_CNT; i++) {
-                        if (i % 20 == 0)
+                        if (i % 5 == 0)
                             log.info("Iteration: " + i);
 
-                        for (int idx = 0; idx < G.allGrids().size(); idx++)
-                            randomUpdate(rnd, evtsQueues, expData, jcache(idx));
+                        randomUpdate(rnd,
+                            evtsQueues,
+                            expData,
+                            keepBinary ? jcache().withKeepBinary() : jcache(),
+                            keepBinary,
+                            withFilter);
                     }
                 }
+                catch (Exception e) {
+                    log.error("Got unexpected error: ", e);
+
+                    throw e;
+                }
                 finally {
                     for (QueryCursor<?> cur : curs)
                         cur.close();
@@ -228,8 +282,10 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
         Random rnd,
         List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
         ConcurrentMap<Object, Object> expData,
-        IgniteCache<Object, Object> cache)
-        throws Exception {
+        IgniteCache<Object, Object> cache,
+        boolean keepBinary,
+        boolean withFilter
+    ) throws Exception {
         Object key = key(rnd.nextInt(KEYS));
         Object newVal = value(rnd.nextInt());
         Object oldVal = expData.get(key);
@@ -253,7 +309,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary, withFilter);
 
                     expData.put(key, newVal);
 
@@ -266,7 +322,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary, withFilter);
 
                     expData.put(key, newVal);
 
@@ -279,7 +335,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, null, oldVal, keepBinary, withFilter);
 
                     expData.remove(key);
 
@@ -292,7 +348,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, null, oldVal, keepBinary, withFilter);
 
                     expData.remove(key);
 
@@ -305,7 +361,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary, withFilter);
 
                     expData.put(key, newVal);
 
@@ -318,7 +374,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                     if (tx != null)
                         tx.commit();
 
-                    waitAndCheckEvent(evtsQueues, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueues, key, null, oldVal, keepBinary, withFilter);
 
                     expData.remove(key);
 
@@ -332,7 +388,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                         tx.commit();
 
                     if (oldVal == null) {
-                        waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, null);
+                        waitAndCheckEvent(evtsQueues, key, newVal, null, keepBinary, withFilter);
 
                         expData.put(key, newVal);
                     }
@@ -349,7 +405,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                         tx.commit();
 
                     if (oldVal == null) {
-                        waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, null);
+                        waitAndCheckEvent(evtsQueues, key, newVal, null, keepBinary, withFilter);
 
                         expData.put(key, newVal);
                     }
@@ -366,7 +422,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                         tx.commit();
 
                     if (oldVal != null) {
-                        waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                        waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary, withFilter);
 
                         expData.put(key, newVal);
                     }
@@ -383,7 +439,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                         tx.commit();
 
                     if (oldVal != null) {
-                        waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                        waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary, withFilter);
 
                         expData.put(key, newVal);
                     }
@@ -405,7 +461,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                             if (tx != null)
                                 tx.commit();
 
-                            waitAndCheckEvent(evtsQueues, affinity(cache), key, newVal, oldVal);
+                            waitAndCheckEvent(evtsQueues, key, newVal, oldVal, keepBinary,
withFilter);
 
                             expData.put(key, newVal);
                         }
@@ -470,19 +526,20 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
 
     /**
      * @param evtsQueues Event queue.
-     * @param aff Affinity function.
      * @param key Key.
      * @param val Value.
      * @param oldVal Old value.
+     * @param keepBinary Keep binary.
+     * @param withFilter With filter.
      * @throws Exception If failed.
      */
     private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>>
evtsQueues,
-        Affinity<Object> aff,
         Object key,
         Object val,
-        Object oldVal)
+        Object oldVal,
+        boolean keepBinary, boolean withFilter)
         throws Exception {
-        if (val == null && oldVal == null || (val != null && !isAccepted(val)))
{
+        if (val == null && oldVal == null || (withFilter && val != null &&
!isAccepted(val, false))) {
             checkNoEvent(evtsQueues);
 
             return;
@@ -492,12 +549,37 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
             CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
 
             assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal="
+ oldVal + ']', evt);
-            assertEquals(key, evt.getKey());
-            assertEquals(val, evt.getValue());
-            assertEquals(oldVal, evt.getOldValue());
+
+            Object actKey = evt.getKey();
+            Object actVal = evt.getValue();
+            Object actOldVal = evt.getOldValue();
+
+            if (keepBinary) {
+                actKey = checkAndGetObject(actKey);
+                actVal = checkAndGetObject(actVal);
+                actOldVal = checkAndGetObject(actOldVal);
+            }
+
+            assertEquals(key, actKey);
+            assertEquals(val, actVal);
+            assertEquals(oldVal, actOldVal);
         }
     }
 
+    /**
+     * @param obj Binary object.
+     * @return Deserialize value.
+     */
+    private Object checkAndGetObject(@Nullable Object obj) {
+        if (obj != null) {
+            assert obj instanceof BinaryObject || obj instanceof ExternalizableObject : obj;
+
+            if (obj instanceof BinaryObject)
+                obj = ((BinaryObject)obj).deserialize();
+        }
+
+        return obj;
+    }
 
     /**
      * @param evtsQueues Event queue.
@@ -591,15 +673,15 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
                             }
                         }, 5_000);
 
-                        checkSingleEvent(evts.get(0), CREATED, value(1), null);
-                        checkSingleEvent(evts.get(1), REMOVED, null, value(1));
-                        checkSingleEvent(evts.get(2), CREATED, value(2), null);
-                        checkSingleEvent(evts.get(3), REMOVED, null, value(2));
-                        checkSingleEvent(evts.get(4), CREATED, value(3), null);
-                        checkSingleEvent(evts.get(5), EventType.UPDATED, value(4), value(3));
-                        checkSingleEvent(evts.get(6), REMOVED, null, value(4));
-                        checkSingleEvent(evts.get(7), CREATED, value(5), null);
-                        checkSingleEvent(evts.get(8), EventType.UPDATED, value(6), value(5));
+                        checkEvent(evts.get(0), CREATED, value(1), null);
+                        checkEvent(evts.get(1), REMOVED, null, value(1));
+                        checkEvent(evts.get(2), CREATED, value(2), null);
+                        checkEvent(evts.get(3), REMOVED, null, value(2));
+                        checkEvent(evts.get(4), CREATED, value(3), null);
+                        checkEvent(evts.get(5), EventType.UPDATED, value(4), value(3));
+                        checkEvent(evts.get(6), REMOVED, null, value(4));
+                        checkEvent(evts.get(7), CREATED, value(5), null);
+                        checkEvent(evts.get(8), EventType.UPDATED, value(6), value(5));
 
                         cache.remove(key);
                         cache.remove(key);
@@ -619,115 +701,13 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
      * @param val Value.
      * @param oldVal Old value.
      */
-    private void checkSingleEvent(CacheEntryEvent<?, ?> event, EventType type, Object
val, Object oldVal) {
+    private void checkEvent(CacheEntryEvent<?, ?> event, EventType type, Object val,
Object oldVal) {
         assertEquals(event.getEventType(), type);
         assertEquals(event.getValue(), val);
         assertEquals(event.getOldValue(), oldVal);
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testRemoveRemoveScenarioBatchOperation() throws Exception {
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                IgniteCache<Object, Object> cache = jcache();
-
-                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
-                final List<CacheEntryEvent<?, ?>> evts = new CopyOnWriteArrayList<>();
-
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
events)
-                        throws CacheEntryListenerException {
-                        for (CacheEntryEvent<?, ?> e : events)
-                            evts.add(e);
-                    }
-                });
-
-                Map<Object, Object> map = new LinkedHashMap<>();
-
-                for (int i = 0; i < KEYS; i++)
-                    map.put(key(i), value(i));
-
-                try (QueryCursor qryCur = cache.query(qry)) {
-                    for (int i = 0; i < ITERATION_CNT / 2; i++) {
-                        log.info("Start iteration: " + i);
-                        // Not events.
-                        cache.removeAll(map.keySet());
-                        cache.invokeAll(map.keySet(), new EntrySetValueProcessor(null, false));
-                        cache.invokeAll(map.keySet(), new EntrySetValueProcessor(true));
-
-                        // Get events.
-                        cache.putAll(map);
-
-                        assert GridTestUtils.waitForCondition(new PA() {
-                            @Override public boolean apply() {
-                                return evts.size() == KEYS;
-                            }
-                        }, 5_000);
-
-                        checkEvents(evts, CREATED);
-
-                        evts.clear();
-
-                        // Not events.
-                        cache.invokeAll(map.keySet(), new EntrySetValueProcessor(true));
-
-                        U.sleep(100);
-
-                        assertEquals(0, evts.size());
-
-                        // Get events.
-                        cache.invokeAll(map.keySet(), new EntrySetValueProcessor(null, false));
-
-                        // Not events.
-                        cache.removeAll(map.keySet());
-                        cache.removeAll(map.keySet());
-
-                        assert GridTestUtils.waitForCondition(new PA() {
-                            @Override public boolean apply() {
-                                return evts.size() == KEYS;
-                            }
-                        }, 5_000);
-
-                        checkEvents(evts, REMOVED);
-
-                        evts.clear();
-
-                        log.info("Finish iteration: " + i);
-                    }
-                }
-            }
-        });
-    }
-
-    /**
-     * @param evts Events.
-     * @param evtType Event type.
-     */
-    private void checkEvents(List<CacheEntryEvent<?, ?>> evts, EventType evtType)
{
-        for (int key = 0; key < KEYS; key++) {
-            Object keyVal = value(key);
-
-            for (CacheEntryEvent<?, ?> e : evts) {
-                if (e.getKey().equals(keyVal)) {
-                    checkSingleEvent(e,
-                        evtType,
-                        evtType == CREATED ? value(key) : null,
-                        evtType == REMOVED ? value(key) : null);
-
-                    keyVal = null;
-
-                    break;
-                }
-            }
-
-            assertNull("Event for key not found.", keyVal);
-        }
-    }
-
-    /**
      *
      */
     protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object,
Object> {
@@ -780,40 +760,118 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
     /**
      *
      */
-    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
+    @IgniteAsyncCallback
+    public static class AsyncSerializableFilter extends SerializableFilter {
+        /**
+         *
+         */
+        public AsyncSerializableFilter() {
+            // No-op.
+        }
+
+        /**
+         * @param keepBinary Keep binary.
+         */
+        public AsyncSerializableFilter(boolean keepBinary) {
+            super(keepBinary);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class SerializableFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
+        /** */
+        private boolean keepBinary;
+
         /** */
         public SerializableFilter() {
             // No-op.
         }
 
+        /**
+         * @param keepBinary Keep binary.
+         */
+        public SerializableFilter(boolean keepBinary) {
+            this.keepBinary = keepBinary;
+        }
+
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> event)
             throws CacheEntryListenerException {
-            return isAccepted(event.getValue());
+            return isAccepted(event.getValue(), keepBinary);
         }
 
         /**
+         * @param val Value.
+         * @param keepBinary Keep binary.
          * @return {@code True} if value is even.
          */
-        public static boolean isAccepted(Object val) {
-            if (val != null)
-                assert val instanceof TestObject;
+        public static boolean isAccepted(Object val, boolean keepBinary) {
+            if (val != null) {
+                int val0 = 0;
+
+                if (val instanceof TestObject) {
+                    assert !keepBinary || val instanceof ExternalizableObject : val;
+
+                    val0 = valueOf(val);
+                }
+                else if (val instanceof BinaryObject) {
+                    assert keepBinary : val;
 
-            return val == null || ((TestObject)val).value() % 2 == 0;
+                    val0 = ((BinaryObject)val).field("val");
+                }
+                else
+                    fail("Unexpected object: " + val);
+
+                return val0 % 2 == 0;
+            }
+
+            return true;
         }
     }
 
     /**
      *
      */
-    public abstract class LocalNonSerialiseListener implements
+    @IgniteAsyncCallback
+    public static class AsyncLocalNonSerializableListener extends LocalNonSerializableListener
{
+        /**
+         * @param clsr Closure.
+         */
+        AsyncLocalNonSerializableListener(IgniteInClosure<Iterable<CacheEntryEvent<?,
?>>> clsr) {
+            super(clsr);
+        }
+
+        /**
+         *
+         */
+        public AsyncLocalNonSerializableListener() {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public static class LocalNonSerializableListener implements
         CacheEntryUpdatedListener<Object, Object>,
         CacheEntryCreatedListener<Object, Object>,
         CacheEntryExpiredListener<Object, Object>,
         CacheEntryRemovedListener<Object, Object>,
         Externalizable {
         /** */
-        public LocalNonSerialiseListener() {
+        IgniteInClosure<Iterable<CacheEntryEvent<?, ?>>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        LocalNonSerializableListener(IgniteInClosure<Iterable<CacheEntryEvent<?,
?>>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** */
+        public LocalNonSerializableListener() {
             // No-op.
         }
 
@@ -840,7 +898,9 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
         /**
          * @param evts Events.
          */
-        protected abstract void onEvents(Iterable<CacheEntryEvent<?, ?>> evts);
+        private void onEvents(Iterable<CacheEntryEvent<?, ?>> evts) {
+            clsr.apply(evts);
+        }
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws IOException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/fab4fdb3/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
index b22f289..9427d54 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java
@@ -350,7 +350,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr
     /**
      *
      */
-    private static class ExternalizableObject extends TestObject implements Externalizable
{
+    protected static class ExternalizableObject extends TestObject implements Externalizable
{
         /**
          * Default constructor.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fab4fdb3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteContinuousQueryVarSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteContinuousQueryVarSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteContinuousQueryVarSuite.java
new file mode 100644
index 0000000..fd7f72b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteContinuousQueryVarSuite.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryVariationsTest;
+import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+
+/**
+ * Test suite for cache queries.
+ */
+public class IgniteContinuousQueryVarSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        System.setProperty(IGNITE_DISCOVERY_HISTORY_SIZE, "100");
+
+        return new ConfigVariationsTestSuiteBuilder(
+            "Ignite Cache Queries Test Suite 5",
+            CacheContinuousQueryVariationsTest.class)
+            .withBasicCacheParams()
+            .gridsCount(3)
+            .backups(1)
+            .testedNodesCount(2)
+            .withClients()
+            .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fab4fdb3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
index 75ac68a..32a693f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
@@ -30,9 +30,9 @@ public class IgniteBinaryCacheQueryTestSuite4 extends TestSuite {
      * @throws Exception In case of error.
      */
     public static TestSuite suite() throws Exception {
-        //GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+        GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
 
-        TestSuite suite = IgniteCacheQuerySelfTestSuite5.suite();//IgniteCacheQuerySelfTestSuite4.suite();
+        TestSuite suite = IgniteCacheQuerySelfTestSuite4.suite();
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fab4fdb3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite5.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite5.java
deleted file mode 100644
index 8418273..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite5.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.testsuites;
-
-import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryVariationsTest;
-import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder;
-
-/**
- * Test suite for cache queries.
- */
-public class IgniteCacheQuerySelfTestSuite5 extends TestSuite {
-    /**
-     * @return Test suite.
-     * @throws Exception If failed.
-     */
-    public static TestSuite suite() throws Exception {
-        return new ConfigVariationsTestSuiteBuilder(
-            "Ignite Cache Queries Test Suite 5",
-            CacheContinuousQueryVariationsTest.class)
-            .withBasicCacheParams()
-            .gridsCount(5).backups(1)
-            .testedNodesCount(3).withClients()
-            .build();
-    }
-}


Mime
View raw message