ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3038
Date Mon, 30 May 2016 07:08:49 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3038 1f98dba2d -> b4883e654


ignite-3038


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4883e65
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4883e65
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4883e65

Branch: refs/heads/ignite-3038
Commit: b4883e65489305b7ca08921a4bc7ec20e382858c
Parents: 1f98dba
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 30 09:41:55 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 30 09:41:55 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |  4 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |  4 +-
 .../continuous/CacheContinuousQueryManager.java | 39 --------------------
 .../continuous/GridContinuousProcessor.java     | 38 ++++++++++++++-----
 .../service/GridServiceProcessor.java           |  4 +-
 5 files changed, 35 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b4883e65/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index cb5b528..68a9d0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -87,11 +87,11 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
         if (ctx.clientNode()) {
             lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
 
-            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+            ctx.continuous().registerStaticRoutine(
                 CU.MARSH_CACHE_NAME,
                 lsnr,
                 null,
-                false);
+                null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4883e65/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 1ef09bf..95e78ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -260,11 +260,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** {@inheritDoc} */
     @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
         if (clientNode && !ctx.isDaemon()) {
-            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+            ctx.continuous().registerStaticRoutine(
                 CU.UTILITY_CACHE_NAME,
                 new MetaDataEntryListener(),
                 new MetaDataEntryFilter(),
-                false);
+                null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4883e65/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f9dcf08..c966527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -492,45 +492,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     }
 
     /**
-     * Registers routine info to be sent in discovery data during this node join
-     * (to be used for internal queries started from client nodes).
-     *
-     * @param ctx Context.
-     * @param cacheName Cache name.
-     * @param locLsnr Local listener.
-     * @param rmtFilter Remote filter.
-     * @param ignoreClsNotFound Ignore class not found error flag.
-     * @return Continuous routine ID.
-     * @throws IgniteCheckedException If failed.
-     */
-    public static UUID registerStaticInternalQuery(
-        GridKernalContext ctx,
-        String cacheName,
-        final CacheEntryUpdatedListener<?, ?> locLsnr,
-        final CacheEntryEventSerializableFilter rmtFilter,
-        final boolean ignoreClsNotFound) throws IgniteCheckedException {
-        String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
-
-        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
-            cacheName,
-            TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), 0),
-            locLsnr,
-            rmtFilter,
-            true,
-            false,
-            true,
-            ignoreClsNotFound);
-
-        hnd.internal(true);
-
-        return ctx.continuous().registerStaticRoutine(hnd,
-            ContinuousQuery.DFLT_PAGE_SIZE,
-            ContinuousQuery.DFLT_TIME_INTERVAL,
-            ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
-            null);
-    }
-
-    /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param loc Local flag.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4883e65/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 6217358..74cadd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,12 +30,15 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -82,6 +85,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
@@ -136,6 +140,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** */
     private boolean processorStopped;
 
+    /** Query sequence number for message topic. */
+    private final AtomicLong seq = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -568,22 +575,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * Registers routine info to be sent in discovery data during this node join
      * (to be used for internal queries started from client nodes).
      *
-     * @param hnd Handler.
-     * @param bufSize Buffer size.
-     * @param interval Time interval.
-     * @param autoUnsubscribe Automatic unsubscribe flag.
+     * @param cacheName Cache name.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
      * @param prjPred Projection predicate.
      * @return Routine ID.
      * @throws IgniteCheckedException If failed.
      */
-    public UUID registerStaticRoutine(GridContinuousHandler hnd,
-        int bufSize,
-        long interval,
-        boolean autoUnsubscribe,
+    public UUID registerStaticRoutine(
+        String cacheName,
+        CacheEntryUpdatedListener<?, ?> locLsnr,
+        CacheEntryEventSerializableFilter rmtFilter,
         @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException
{
+        String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+            cacheName,
+            TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()),
+            locLsnr,
+            rmtFilter,
+            true,
+            false,
+            true,
+            false);
+
+        hnd.internal(true);
+
         final UUID routineId = UUID.randomUUID();
 
-        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, bufSize, interval,
autoUnsubscribe);
+        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
 
         locInfos.put(routineId, routineInfo);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4883e65/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 7734d79..853e6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -169,11 +169,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (ctx.clientNode()) {
             assert !ctx.isDaemon();
 
-            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+            ctx.continuous().registerStaticRoutine(
                 CU.UTILITY_CACHE_NAME,
                 new ServiceEntriesListener(),
                 null,
-                true);
+                null);
         }
     }
 


Mime
View raw message