ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [2/4] ignite git commit: IGNITE-3004 WIP
Date Thu, 21 Apr 2016 14:42:18 GMT
IGNITE-3004 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0986fa98
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0986fa98
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0986fa98

Branch: refs/heads/ignite-3004
Commit: 0986fa988f4ce0155351a9fa577aca59893d3f9f
Parents: bfeb3b6
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Sun Apr 17 18:19:14 2016 +0300
Committer: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Committed: Sun Apr 17 18:19:14 2016 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryVariationsTest.java     | 236 ++++++++++++++++---
 .../IgniteBinaryCacheQueryTestSuite4.java       |   4 +-
 2 files changed, 205 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0986fa98/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
index 8aae50f..c85246b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -28,14 +32,22 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
 import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -55,6 +67,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static javax.cache.event.EventType.CREATED;
 import static javax.cache.event.EventType.REMOVED;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryVariationsTest.SerializableFilter.isAccepted;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
@@ -75,51 +88,132 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
     /**
      * @throws Exception If failed.
      */
-    private void testRandomSingleOperation() throws Exception {
-        long seed = System.currentTimeMillis();
+    public void testRandomSingleOperationJCacheApi() throws Exception {
+        testRandomSingleOperation(true, false, false);
+    }
 
-        Random rnd = new Random(seed);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSingleOperationJCacheApiWithFilter() throws Exception {
+        testRandomSingleOperation(true, false, true);
+    }
 
-        log.info("Random seed: " + seed);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSingleOperationJCacheApiSync() throws Exception {
+        testRandomSingleOperation(true, true, false);
+    }
 
-        // Register listener on all nodes.
-        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSingleOperationJCacheApiSyncWithFilter() throws Exception {
+        testRandomSingleOperation(true, true, true);
+    }
 
-        Collection<QueryCursor<?>> curs = new ArrayList<>();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSingleOperation() throws Exception {
+        testRandomSingleOperation(true, true, false);
+    }
 
-        for (int idx = 0; idx < G.allGrids().size(); idx++) {
-            final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSingleOperationWithFilter() throws Exception {
+        testRandomSingleOperation(true, true, true);
+    }
 
-            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+    /**
+     * @param jcacheApi Use JCache API.
+     * @param syncNtf Use sync notification.
+     * @param withFilter Use filter.
+     * @throws Exception If failed.
+     */
+    private void testRandomSingleOperation(final boolean jcacheApi, final boolean syncNtf,
final boolean withFilter)
+        throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                long seed = System.currentTimeMillis();
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
evts)
-                    throws CacheEntryListenerException {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                }
-            });
+                Random rnd = new Random(seed);
 
-            curs.add(jcache(idx).query(qry));
+                log.info("Random seed: " + seed);
 
-            evtsQueues.add(evtsQueue);
-        }
+                // Register listener on all nodes.
+                List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues =
new ArrayList<>();
 
-        ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+                Collection<QueryCursor<?>> curs = new ArrayList<>();
 
-        try {
-            for (int i = 0; i < ITERATION_CNT; i++) {
-                if (i % 20 == 0)
-                    log.info("Iteration: " + i);
+                Collection<MutableCacheEntryListenerConfiguration> lsnrCfgs = new ArrayList<>();
+
+                for (int idx = 0; idx < G.allGrids().size(); idx++) {
+                    final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new
ArrayBlockingQueue<>(50_000);
+
+                    final CacheEntryUpdatedListener<Object, Object> lsnr =
+                        new LocalNonSerialiseListener() {
+                            @Override protected void onEvents(Iterable<CacheEntryEvent<?,
?>> evts) {
+                                for (CacheEntryEvent<?, ?> evt : evts)
+                                    evtsQueue.add(evt);
+                            }
+                        };
+
+                    if (jcacheApi) {
+                        MutableCacheEntryListenerConfiguration<Object, Object> lsnrCfg
=
+                            new MutableCacheEntryListenerConfiguration<>(
+                                new Factory<CacheEntryListener<? super Object, ? super
Object>>() {
+                                    @Override public CacheEntryListener<? super Object,
? super Object> create() {
+                                        return lsnr;
+                                    }
+                                },
+                                withFilter ? FactoryBuilder.factoryOf(SerializableFilter.class)
: null,
+                                true,
+                                syncNtf
+                            );
+
+                        jcache(idx).registerCacheEntryListener(lsnrCfg);
+
+                        lsnrCfgs.add(lsnrCfg);
+                    }
+                    else {
+                        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                        qry.setLocalListener(lsnr);
+
+                        qry.setRemoteFilterFactory(withFilter ?
+                            FactoryBuilder.factoryOf(SerializableFilter.class) : null);
 
-                for (int idx = 0; idx < G.allGrids().size(); idx++)
-                    randomUpdate(rnd, evtsQueues, expData, jcache(idx));
+                        curs.add(jcache(idx).query(qry));
+
+                        evtsQueues.add(evtsQueue);
+                    }
+                }
+
+                ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+                try {
+                    for (int i = 0; i < ITERATION_CNT; i++) {
+                        if (i % 20 == 0)
+                            log.info("Iteration: " + i);
+
+                        for (int idx = 0; idx < G.allGrids().size(); idx++)
+                            randomUpdate(rnd, evtsQueues, expData, jcache(idx));
+                    }
+                }
+                finally {
+                    for (QueryCursor<?> cur : curs)
+                        cur.close();
+
+                    for (int i = 0; i < G.allGrids().size(); i++) {
+                        for (MutableCacheEntryListenerConfiguration cfg : lsnrCfgs)
+                            jcache(i).deregisterCacheEntryListener(cfg);
+                    }
+                }
             }
-        }
-        finally {
-            for (QueryCursor<?> cur : curs)
-                cur.close();
-        }
+        });
     }
 
     /**
@@ -383,7 +477,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
         Object val,
         Object oldVal)
         throws Exception {
-        if (val == null && oldVal == null) {
+        if (val == null && oldVal == null || (val != null && !isAccepted(val)))
{
             checkNoEvent(evtsQueues);
 
             return;
@@ -677,4 +771,80 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati
             return S.toString(EntrySetValueProcessor.class, this);
         }
     }
+
+    /**
+     *
+     */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Object val) {
+            if (val != null)
+                assert val instanceof TestObject;
+
+            return val == null || ((TestObject)val).value() % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<Object, Object>,
+        CacheEntryCreatedListener<Object, Object>,
+        CacheEntryExpiredListener<Object, Object>,
+        CacheEntryRemovedListener<Object, Object>,
+        Externalizable {
+        /** */
+        public LocalNonSerialiseListener() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts)
throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts)
throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvents(Iterable<CacheEntryEvent<?, ?>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0986fa98/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
index 32a693f..75ac68a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
@@ -30,9 +30,9 @@ public class IgniteBinaryCacheQueryTestSuite4 extends TestSuite {
      * @throws Exception In case of error.
      */
     public static TestSuite suite() throws Exception {
-        GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+        //GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
 
-        TestSuite suite = IgniteCacheQuerySelfTestSuite4.suite();
+        TestSuite suite = IgniteCacheQuerySelfTestSuite5.suite();//IgniteCacheQuerySelfTestSuite4.suite();
 
         return suite;
     }


Mime
View raw message