ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [14/18] ignite git commit: IGNITE-2822 Continuous query local listener can be notified with empty list of events
Date Wed, 06 Apr 2016 08:00:53 GMT
IGNITE-2822
Continuous query local listener can be notified with empty list of events


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8435e7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8435e7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8435e7c

Branch: refs/heads/ignite-2004
Commit: e8435e7c9368fa2b5be3b0f4449417272315d16e
Parents: f1af2c7
Author: vdpyatkov <vpyatkov@gridgain.com>
Authored: Tue Apr 5 18:03:55 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Tue Apr 5 21:10:02 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |   3 +-
 ...acheContinuousQueryExecuteInPrimaryTest.java | 306 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 3 files changed, 310 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e8435e7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 6243af7..767697a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -382,7 +382,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                                         }
                                     );
 
-                                    locLsnr.onUpdated(evts);
+                                    if (!F.isEmpty(evts))
+                                        locLsnr.onUpdated(evts);
 
                                     if (!internal && !skipPrimaryCheck)
                                         sendBackupAcknowledge(ackBuf.onAcknowledged(entry),
routineId, ctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8435e7c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
new file mode 100644
index 0000000..1a52909
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import javax.cache.Cache;
+import javax.cache.configuration.FactoryBuilder;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.NotNull;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries execute in primary node tests.
+ */
+public class CacheContinuousQueryExecuteInPrimaryTest extends GridCommonAbstractTest
+    implements Serializable {
+
+    /** Latch timeout. */
+    protected static final long LATCH_TIMEOUT = 5000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(true);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    @NotNull
+    protected CacheConfiguration<Integer, String> cacheConfiguration(
+        CacheAtomicityMode cacheAtomicityMode,
+        CacheMode cacheMode) {
+        CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(cacheAtomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(ATOMIC, LOCAL);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(ATOMIC, REPLICATED);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(ATOMIC, PARTITIONED);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransactionLocalCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(TRANSACTIONAL,
LOCAL);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransactionReplicatedCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(TRANSACTIONAL,
REPLICATED);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransactionPartitionedCache() throws Exception {
+        CacheConfiguration<Integer, String> ccfg = cacheConfiguration(TRANSACTIONAL,
PARTITIONED);
+
+        doTestWithoutEventsEntries(ccfg);
+        doTestWithEventsEntries(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestWithoutEventsEntries(CacheConfiguration<Integer, String> ccfg)
throws Exception {
+
+        try (IgniteCache<Integer, String> cache = grid(0).createCache(ccfg)) {
+
+            int ITERATION_CNT = 100;
+            final AtomicBoolean noOneListen = new AtomicBoolean(true);
+
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>()
{
+                    @Override public void onUpdated(
+                        Iterable<CacheEntryEvent<? extends Integer, ? extends String>>
iterable)
+                        throws CacheEntryListenerException {
+                        noOneListen.set(false);
+                    }
+                });
+
+                qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                    new CacheEntryEventSerializableFilter<Integer, String>() {
+                        @Override public boolean evaluate(
+                            CacheEntryEvent<? extends Integer, ? extends String> cacheEntryEvent)
+                            throws CacheEntryListenerException {
+                            return false;
+                        }
+                    }));
+
+                executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+            }
+
+            assertTrue(noOneListen.get());
+
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    private void executeQuery(IgniteCache<Integer, String> cache, ContinuousQuery<Integer,
String> qry,
+        boolean isTransactional) {
+        try (QueryCursor<Cache.Entry<Integer, String>> qryCursor = cache.query(qry))
{
+            Transaction tx = null;
+
+            if (isTransactional)
+                tx = cache.unwrap(Ignite.class).transactions().txStart();
+
+            try {
+                for (int key = 0; key < 8; key++)
+                    cache.put(key, Integer.toString(key));
+
+                Map<Integer, String> map = new HashMap<>(8);
+
+                for (int key = 8; key < 16; key++)
+                    map.put(key, Integer.toString(key));
+
+                cache.putAll(map);
+
+                if (isTransactional)
+                    tx.commit();
+
+            }
+            finally {
+                if (isTransactional)
+                    tx.close();
+            }
+
+            for (int key = 0; key < 8; key++) {
+                cache.invoke(key, new EntryProcessor<Integer, String, Object>() {
+                    @Override public Object process(MutableEntry<Integer, String> entry,
+                        Object... objects) throws EntryProcessorException {
+                        entry.setValue(Integer.toString(entry.getKey() + 1));
+                        return null;
+                    }
+
+                });
+            }
+
+            Map<Integer, EntryProcessor<Integer, String, Object>> invokeMap =
new HashMap<>(8);
+
+            for (int key = 8; key < 16; key++) {
+                invokeMap.put(key, new EntryProcessor<Integer, String, Object>() {
+                    @Override public Object process(MutableEntry<Integer, String> entry,
+                        Object... objects) throws EntryProcessorException {
+                        entry.setValue(Integer.toString(entry.getKey() - 1));
+
+                        return null;
+                    }
+                });
+            }
+
+            cache.invokeAll(invokeMap);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void doTestWithEventsEntries(CacheConfiguration<Integer, String> ccfg) throws
Exception {
+        try (IgniteCache<Integer, String> cache = grid(0).createCache(ccfg)) {
+
+            ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+            final CountDownLatch latch = new CountDownLatch(16);
+            final AtomicInteger cnt = new AtomicInteger(0);
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
Integer, ? extends String>> iterable)
+                    throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends Integer, ? extends String> e : iterable)
{
+                        cnt.incrementAndGet();
+                        latch.countDown();
+                    }
+                }
+            });
+
+            qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                new CacheEntryEventSerializableFilter<Integer, String>() {
+                    @Override public boolean evaluate(CacheEntryEvent<? extends Integer,
? extends String> e)
+                        throws CacheEntryListenerException {
+                        return e.getKey() % 2 == 0;
+                    }
+                }
+            ));
+
+            // Execute query.
+            executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+
+            assertTrue(latch.await(LATCH_TIMEOUT, MILLISECONDS));
+            assertEquals(16, cnt.get());
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8435e7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 73c856b..fbb3091 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
@@ -91,6 +92,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
+        suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class);
 
         return suite;
     }


Mime
View raw message