ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [38/50] ignite git commit: IGNITE-1536 - Removed duplicated continuous query notifications in REPLICATED cache
Date Thu, 24 Sep 2015 23:12:14 GMT
IGNITE-1536 - Removed duplicated continuous query notifications in REPLICATED cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db44f11
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db44f11
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db44f11

Branch: refs/heads/ignite-257
Commit: 7db44f11f7925b5a29a0a3e017baa93b52fb2982
Parents: 70a8a92
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Sep 23 18:53:06 2015 -0700
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Sep 23 18:53:06 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |   4 +-
 .../continuous/CacheContinuousQueryManager.java |  58 +++------
 .../continuous/GridContinuousProcessor.java     |   3 +-
 ...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +-
 5 files changed, 144 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cc6c19a..ae96f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -556,7 +556,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
-                loc ? ctx.grid().cluster().forLocal() : null);
+                loc);
 
             final QueryCursor<Cache.Entry<K, V>> cur =
                 qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
@@ -1896,4 +1896,4 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     @Override public String toString() {
         return S.toString(IgniteCacheProxy.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c719f1e..6a151a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -43,10 +43,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
 import org.jsr166.ConcurrentHashMap8;
@@ -271,7 +271,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
-     * @param grp Cluster group.
+     * @param loc Local flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
@@ -280,7 +280,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
-        ClusterGroup grp) throws IgniteCheckedException
+        boolean loc) throws IgniteCheckedException
     {
         return executeQuery0(
             locLsnr,
@@ -293,7 +293,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             true,
             false,
             true,
-            grp);
+            loc);
     }
 
     /**
@@ -321,7 +321,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             true,
             false,
             true,
-            loc ? cctx.grid().cluster().forLocal() : null);
+            loc);
     }
 
     /**
@@ -383,7 +383,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired event flag.
-     * @param grp Cluster group.
+     * @param loc Local flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
@@ -397,44 +397,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        ClusterGroup grp) throws IgniteCheckedException
+        boolean loc) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        if (grp == null)
-            grp = cctx.kernalContext().grid().cluster();
-
-        Collection<ClusterNode> nodes = grp.nodes();
-
-        if (nodes.isEmpty())
-            throw new ClusterTopologyException("Failed to execute continuous query (empty
cluster group is " +
-                "provided).");
-
-        boolean skipPrimaryCheck = false;
-
-        switch (cctx.config().getCacheMode()) {
-            case LOCAL:
-                if (!nodes.contains(cctx.localNode()))
-                    throw new ClusterTopologyException("Continuous query for LOCAL cache
can be executed " +
-                        "only locally (provided projection contains remote nodes only).");
-                else if (nodes.size() > 1)
-                    U.warn(log, "Continuous query for LOCAL cache will be executed locally
(provided projection is " +
-                        "ignored).");
-
-                grp = grp.forNode(cctx.localNode());
-
-                break;
-
-            case REPLICATED:
-                if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode()))
-                    skipPrimaryCheck = cctx.affinityNode();
-
-                break;
-        }
-
         int taskNameHash = !internal && cctx.kernalContext().security().enabled()
?
             cctx.kernalContext().job().currentTaskNameHash() : 0;
 
+        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
+
         GridContinuousHandler hnd = new CacheContinuousQueryHandler(
             cctx.name(),
             TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -448,12 +419,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             taskNameHash,
             skipPrimaryCheck);
 
+        IgnitePredicate<ClusterNode> pred = null;
+
+        if (loc || cctx.config().getCacheMode() == CacheMode.LOCAL)
+            pred = F.nodeForNodeId(cctx.localNodeId());
+
         UUID id = cctx.kernalContext().continuous().startRoutine(
             hnd,
             bufSize,
             timeInterval,
             autoUnsubscribe,
-            grp.predicate()).get();
+            pred).get();
 
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -635,7 +611,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 cfg.isOldValueRequired(),
                 cfg.isSynchronous(),
                 false,
-                null);
+                false);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 18c1f36..e29bdd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -795,7 +795,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             try {
                 IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
 
-                ctx.resource().injectGeneric(prjPred);
+                if (prjPred != null)
+                    ctx.resource().injectGeneric(prjPred);
 
                 if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId())))
{
                     registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(),
data.interval(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
new file mode 100644
index 0000000..8152b2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for replicated cache with one node.
+ */
+public class GridCacheContinuousQueryReplicatedOneNodeSelfTest extends GridCommonAbstractTest
{
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(CacheMode.REPLICATED);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        doTest(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributed() throws Exception {
+        doTest(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest(boolean loc) throws Exception {
+        try {
+            IgniteCache<String, Integer> cache = startGrid(0).cache(null);
+
+            ContinuousQuery<String, Integer> qry = new ContinuousQuery<>();
+
+            final AtomicInteger cnt = new AtomicInteger();
+            final CountDownLatch latch = new CountDownLatch(10);
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
+                @Override
+                public void onUpdated(Iterable<CacheEntryEvent<? extends String, ?
extends Integer>> evts)
+                        throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends String, ? extends Integer> evt :
evts) {
+                        cnt.incrementAndGet();
+                        latch.countDown();
+                    }
+                }
+            });
+
+            cache.query(qry.setLocal(loc));
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            for (int i = 0; i < 10; i++)
+                cache.put("key" + i, i);
+
+            assert latch.await(5000, TimeUnit.MILLISECONDS);
+
+            assertEquals(10, cnt.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 41670d1..fe54b63 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
@@ -158,6 +159,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
@@ -187,4 +189,4 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         return suite;
     }
-}
\ No newline at end of file
+}


Mime
View raw message