ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3038
Date Thu, 26 May 2016 13:54:35 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3038 [created] f70286d47


ignite-3038


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f70286d4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f70286d4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f70286d4

Branch: refs/heads/ignite-3038
Commit: f70286d478d40bebb259d827e1117b290c25b53c
Parents: fc3b154
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 26 13:00:46 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 26 16:54:21 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |  54 ++-
 .../processors/cache/GridCacheAdapter.java      |  10 +-
 .../continuous/CacheContinuousQueryManager.java | 158 ++++++++
 .../continuous/GridContinuousProcessor.java     | 117 ++++--
 .../service/GridServiceProcessor.java           | 390 ++++++++++---------
 .../service/GridServiceClientNodeTest.java      |  64 ++-
 6 files changed, 563 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 8f566a9..9a51ed8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -37,8 +37,10 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.util.GridStripedLock;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.PluginProvider;
 
@@ -64,6 +66,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     /** Non-volatile on purpose. */
     private int failedCnt;
 
+    /** */
+    private ContinuousQueryListener lsnr;
+
     /**
      * @param plugins Plugins.
      * @throws IgniteCheckedException In case of error.
@@ -75,25 +80,58 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     }
 
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
+        if (ctx.clientNode()) {
+            lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
+
+            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+                CU.MARSH_CACHE_NAME,
+                lsnr,
+                null,
+                false);
+        }
+    }
+
+    /**
      * @param ctx Kernal context.
      * @throws IgniteCheckedException In case of error.
      */
     public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException
{
         assert ctx != null;
 
-        if (!ctx.isDaemon()) {
+        log = ctx.log(MarshallerContextImpl.class);
+
+        cache = ctx.cache().marshallerCache();
+
+        if (ctx.cache().marshallerCache().context().affinityNode()) {
             ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
-                new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+                new ContinuousQueryListener(log, workDir),
                 null,
-                ctx.cache().marshallerCache().context().affinityNode(),
+                true,
                 true,
                 false
             );
         }
-
-        log = ctx.log(MarshallerContextImpl.class);
-
-        cache = ctx.cache().marshallerCache();
+        else {
+            if (lsnr != null) {
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @SuppressWarnings("unchecked")
+                    @Override public void run() {
+                        try {
+                            Iterable entries = cache.context().continuousQueries().existingEntries(false,
null);
+
+                            lsnr.onUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load marshaller cache entries: " + e,
e);
+                        }
+                    }
+                });
+            }
+        }
 
         latch.countDown();
     }
@@ -222,6 +260,8 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
                 assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()):
                     "Received cache entry update for system marshaller cache: " + evt;
 
+                log.info("Marshaller cache: " + evt);
+
                 if (evt.getOldValue() == null) {
                     String fileName = evt.getKey() + ".classname";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 29ca32c..748f133 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4132,6 +4132,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * @return Distributed ignite cache iterator.
      */
     public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException
{
+        return igniteIterator(ctx.keepBinary());
+    }
+
+    /**
+     * @param keepBinary
+     * @return Distributed ignite cache iterator.
+     */
+    public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws
IgniteCheckedException {
         GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4139,7 +4147,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size()
== 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null,
null, ctx.keepBinary())
+        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null,
null, keepBinary)
             .keepAll(false)
             .executeScanQuery();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b042249..7320d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryCreatedListener;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
@@ -57,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
@@ -487,6 +491,33 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             false);
     }
 
+    public static UUID registerStaticInternalQuery(
+        GridKernalContext ctx,
+        String cacheName,
+        final CacheEntryUpdatedListener<?, ?> locLsnr,
+        final CacheEntryEventSerializableFilter rmtFilter,
+        final boolean ignoreClsNotFound) throws IgniteCheckedException {
+        String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+            cacheName,
+            TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), 0),
+            locLsnr,
+            rmtFilter,
+            true,
+            false,
+            true,
+            ignoreClsNotFound);
+
+        hnd.internal(true);
+
+        return ctx.continuous().registerStaticRoutine(hnd,
+            ContinuousQuery.DFLT_PAGE_SIZE,
+            ContinuousQuery.DFLT_TIME_INTERVAL,
+            ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
+            null);
+    }
+
     /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
@@ -728,6 +759,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     }
 
     /**
+     * @param keepBinary Keep binary flag.
+     * @param filter Filter.
+     * @return Iterable for events created for existing cache entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary,
final CacheEntryEventFilter filter)
+        throws IgniteCheckedException {
+        final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary);
+
+        final Cache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+        return new Iterable<CacheEntryEvent<?, ?>>() {
+            @Override public Iterator<CacheEntryEvent<?, ?>> iterator() {
+                return new Iterator<CacheEntryEvent<?, ?>>() {
+                    private CacheQueryEntryEvent<?, ?> next;
+
+                    {
+                        advance();
+                    }
+
+                    @Override public boolean hasNext() {
+                        return next != null;
+                    }
+
+                    @Override public CacheEntryEvent<?, ?> next() {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        CacheEntryEvent next0 = next;
+
+                        advance();
+
+                        return next0;
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    private void advance() {
+                        next = null;
+
+                        while (next == null) {
+                            if (!it.hasNext())
+                                break;
+
+                            Cache.Entry e = it.next();
+
+                            next = new CacheEntryEventImpl(
+                                cache,
+                                CREATED,
+                                e.getKey(),
+                                e.getValue());
+
+                            if (filter != null && !filter.evaluate(next))
+                                next = null;
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    /**
      * @param nodes Nodes.
      * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
      *     otherwise {@code false}.
@@ -1129,4 +1224,67 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 lsnr.acknowledgeBackupOnTimeout(ctx);
         }
     }
+
+    /**
+     *
+     */
+    private static class CacheEntryEventImpl extends CacheQueryEntryEvent {
+        /** */
+        @GridToStringInclude
+        private Object key;
+
+        /** */
+        @GridToStringInclude
+        private Object val;
+
+        /**
+         * @param src Event source.
+         * @param evtType Event type.
+         * @param key Key.
+         * @param val Value.
+         */
+        public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val)
{
+            super(src, evtType);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getPartitionUpdateCounter() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getOldValue() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isOldValueAvailable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object unwrap(Class cls) {
+            if (cls.isAssignableFrom(getClass()))
+                return cls.cast(this);
+
+            throw new IllegalArgumentException("Unwrapping to class is not supported: " +
cls);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheEntryEventImpl.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fd798df..0ac04e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -310,6 +310,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         });
 
+        ctx.marshallerContext().onContinuousProcessorStarted(ctx);
+
+        ctx.service().onContinuousProcessorStarted(ctx);
+
         if (log.isDebugEnabled())
             log.debug("Continuous processor started.");
     }
@@ -393,16 +397,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
+        log.info("collectDiscoveryData [node=" + nodeId +
+            ", loc=" + ctx.localNodeId() +
+            ", locInfos=" + locInfos +
+            ", clientInfos=" + clientInfos +
+            ']');
+
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet())
{
-                Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+                Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
 
                 for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
-                    copy.put(e0.getKey(), e0.getValue());
+                    cp.put(e0.getKey(), e0.getValue());
 
-                clientInfos0.put(e.getKey(), copy);
+                clientInfos0.put(e.getKey(), cp);
+            }
+
+            if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient())
{
+                assert clientInfos0.isEmpty() : clientInfos0;
+
+                Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+                    infos.put(e.getKey(), e.getValue());
+
+                clientInfos0.put(ctx.localNodeId(), infos);
             }
 
             DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
@@ -430,6 +451,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable
obj) {
         DiscoveryData data = (DiscoveryData)obj;
 
+        log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+            ", rmtNodeId=" + rmtNodeId +
+            ", loc=" + ctx.localNodeId() +
+            ", data=" + data +
+            ']');
+
         if (!ctx.isDaemon() && data != null) {
             for (DiscoveryDataItem item : data.items) {
                 try {
@@ -541,6 +568,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param bufSize Buffer size.
      * @param interval Time interval.
      * @param autoUnsubscribe Automatic unsubscribe flag.
+     * @param prjPred Projection predicate.
+     * @return Routine ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    public UUID registerStaticRoutine(GridContinuousHandler hnd,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException
{
+        final UUID routineId = UUID.randomUUID();
+
+        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, bufSize, interval,
autoUnsubscribe);
+
+        locInfos.put(routineId, routineInfo);
+
+        registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe,
true);
+
+        registerMessageListener(hnd);
+
+        return routineId;
+    }
+
+    /**
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
      * @param locOnly Local only flag.
      * @param prjPred Projection predicate.
      * @return Future.
@@ -610,29 +664,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Register per-routine notifications listener if ordered messaging is used.
-        if (hnd.orderedTopic() != null) {
-            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
-                    GridContinuousMessage msg = (GridContinuousMessage)obj;
-
-                    // Only notification can be ordered.
-                    assert msg.type() == MSG_EVT_NOTIFICATION;
-
-                    if (msg.data() == null && msg.dataBytes() != null) {
-                        try {
-                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to process message (ignoring): " + msg,
e);
-
-                            return;
-                        }
-                    }
-
-                    processNotification(nodeId, msg);
-                }
-            });
-        }
+        registerMessageListener(hnd);
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
@@ -664,6 +696,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param hnd Handler.
+     */
+    private void registerMessageListener(GridContinuousHandler hnd) {
+        if (hnd.orderedTopic() != null) {
+            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object obj) {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+                    // Only notification can be ordered.
+                    assert msg.type() == MSG_EVT_NOTIFICATION;
+
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message (ignoring): " + msg,
e);
+
+                            return;
+                        }
+                    }
+
+                    processNotification(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
      * @param routineId Consume ID.
      * @return Future.
      */
@@ -807,12 +868,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException
{
         cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client
node disconnected."));
 
+        log.info("onDisconnected [rmtInfos=" + rmtInfos + ", clientInfos=" + clientInfos
+ ']');
+
         for (UUID rmtId : rmtInfos.keySet())
             unregisterRemote(rmtId);
 
         rmtInfos.clear();
 
         clientInfos.clear();
+
+        log.info("after onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos
+ ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 97d9988..15a0685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -57,9 +57,12 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -144,12 +147,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
-    /** Deployment listener ID. */
-    private UUID cfgQryId;
-
-    /** Assignment listener ID. */
-    private UUID assignQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -166,6 +163,22 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp
: false, false));
     }
 
+    /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
+        if (ctx.clientNode()) {
+            assert !ctx.isDaemon();
+
+            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+                CU.UTILITY_CACHE_NAME,
+                new ServiceEntriesListener(),
+                null,
+                true);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         ctx.addNodeAttribute(ATTR_SERVICES_COMPATIBILITY_MODE, srvcCompatibilitySysProp);
@@ -199,11 +212,30 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
             boolean affNode = cache.context().affinityNode();
 
-            cfgQryId = cache.context().continuousQueries().executeInternalQuery(
-                new DeploymentListener(), null, affNode, true, !affNode);
+            // For clients continuous query is statically registered.
+            if (affNode) {
+                cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
+                    null,
+                    affNode,
+                    true,
+                    !affNode);
+            }
+            else {
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Iterable<CacheEntryEvent<?, ?>> entries =
+                                cache.context().continuousQueries().existingEntries(false,
null);
+
+                            onSystemCacheUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load service entries: " + e, e);
+                        }
+                    }
+                });
+            }        assert !ctx.isDaemon();
 
-            assignQryId = cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, affNode, true, !affNode);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -237,12 +269,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (!ctx.clientNode())
             ctx.event().removeLocalEventListener(topLsnr);
 
-        if (cfgQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
-
-        if (assignQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(assignQryId);
-
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
         synchronized (locSvcs) {
@@ -1284,148 +1310,170 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Service deployment listener.
      */
-    private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object>
{
+    @SuppressWarnings("unchecked")
+    private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object,
Object> {
         /** {@inheritDoc} */
         @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>>
deps) {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    boolean firstTime = true;
+                    onSystemCacheUpdated(deps);
+                }
+            });
+        }
+    }
 
-                    for (CacheEntryEvent<?, ?> e : deps) {
-                        if (!(e.getKey() instanceof GridServiceDeploymentKey))
-                            continue;
+    private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> deps)
{
+        boolean firstTime = true;
 
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
+        for (CacheEntryEvent<?, ?> e : deps) {
+            log.info("Service listener: " + e);
 
-                            firstTime = false;
-                        }
+            if (e.getKey() instanceof GridServiceDeploymentKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                        GridServiceDeployment dep;
+                    firstTime = false;
+                }
 
-                        try {
-                            dep = (GridServiceDeployment)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+                processDeployment((CacheEntryEvent)e);
+            }
+            else if (e.getKey() instanceof GridServiceAssignmentsKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                        if (dep != null) {
-                            svcName.set(dep.configuration().getName());
+                    firstTime = false;
+                }
 
-                            // Ignore other utility cache events.
-                            long topVer = ctx.discovery().topologyVersion();
+                processAssignment((CacheEntryEvent)e);
+            }
+        }
+    }
 
-                            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer),
null);
+    /**
+     * @param e Entry.
+     */
+    private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment>
e) {
+        GridServiceDeployment dep;
 
-                            if (oldest.isLocal())
-                                onDeployment(dep, topVer);
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceDeploymentKey)e.getKey()).name();
+        try {
+            dep = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                            svcName.set(name);
+        if (dep != null) {
+            svcName.set(dep.configuration().getName());
 
-                            Collection<ServiceContextImpl> ctxs;
+            // Ignore other utility cache events.
+            long topVer = ctx.discovery().topologyVersion();
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
+            if (oldest.isLocal())
+                onDeployment(dep, topVer);
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            // Finish deployment futures if undeployment happened.
-                            GridFutureAdapter<?> fut = depFuts.remove(name);
+            svcName.set(name);
 
-                            if (fut != null)
-                                fut.onDone();
+            Collection<ServiceContextImpl> ctxs;
 
-                            // Complete undeployment future.
-                            fut = undepFuts.remove(name);
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            if (fut != null)
-                                fut.onDone();
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
+                }
+            }
 
-                            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+            // Finish deployment futures if undeployment happened.
+            GridFutureAdapter<?> fut = depFuts.remove(name);
 
-                            // Remove assignment on primary node in case of undeploy.
-                            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(),
key)) {
-                                try {
-                                    cache.getAndRemove(key);
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    U.error(log, "Failed to remove assignments for undeployed
service: " + name, ex);
-                                }
-                            }
-                        }
-                    }
+            if (fut != null)
+                fut.onDone();
+
+            // Complete undeployment future.
+            fut = undepFuts.remove(name);
+
+            if (fut != null)
+                fut.onDone();
+
+            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+
+            // Remove assignment on primary node in case of undeploy.
+            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
+                try {
+                    cache.getAndRemove(key);
                 }
-            });
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to remove assignments for undeployed service: "
+ name, ex);
+                }
+            }
         }
+    }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final long topVer) {
-            // Retry forever.
-            try {
-                long newTopVer = ctx.discovery().topologyVersion();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final long topVer) {
+        // Retry forever.
+        try {
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer == topVer)
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer == topVer)
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
 
-                long newTopVer = ctx.discovery().topologyVersion();
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                if (newTopVer != topVer) {
-                    assert newTopVer > topVer;
+            if (newTopVer != topVer) {
+                assert newTopVer > topVer;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
-            }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 
@@ -1585,79 +1633,59 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Assignment listener.
+     * @param e Entry.
      */
-    private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object>
{
-        /** {@inheritDoc} */
-        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>>
assignCol) throws CacheEntryListenerException {
-            depExe.submit(new BusyRunnable() {
-                @Override public void run0() {
-                    boolean firstTime = true;
-
-                    for (CacheEntryEvent<?, ?> e : assignCol) {
-                        if (!(e.getKey() instanceof GridServiceAssignmentsKey))
-                            continue;
-
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
+    private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments>
e) {
+        GridServiceAssignments assigns;
 
-                            firstTime = false;
-                        }
-
-                        GridServiceAssignments assigns;
-
-                        try {
-                            assigns = (GridServiceAssignments)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+        try {
+            assigns = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                        if (assigns != null) {
-                            svcName.set(assigns.name());
+        if (assigns != null) {
+            svcName.set(assigns.name());
 
-                            Throwable t = null;
+            Throwable t = null;
 
-                            try {
-                                redeploy(assigns);
-                            }
-                            catch (Error | RuntimeException th) {
-                                t = th;
-                            }
+            try {
+                redeploy(assigns);
+            }
+            catch (Error | RuntimeException th) {
+                t = th;
+            }
 
-                            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
 
-                            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration()))
{
-                                depFuts.remove(assigns.name(), fut);
+            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration()))
{
+                depFuts.remove(assigns.name(), fut);
 
-                                // Complete deployment futures once the assignments have
been stored in cache.
-                                fut.onDone(null, t);
-                            }
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
+                // Complete deployment futures once the assignments have been stored in cache.
+                fut.onDone(null, t);
+            }
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            svcName.set(name);
+            svcName.set(name);
 
-                            Collection<ServiceContextImpl> ctxs;
+            Collection<ServiceContextImpl> ctxs;
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
-                        }
-                    }
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
                 }
-            });
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f70286d4/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index c3b9cf4..6d869ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,47 +35,80 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest
{
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODE_CNT = 3;
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(30);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(1000);
 
-        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
-            cfg.setClientMode(true);
+        cfg.setClientMode(client);
 
         return cfg;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
 
-        startGrids(NODE_CNT);
+        super.afterTest();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClient() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        Ignite ignite = startGrid(3);
+
+        checkDeploy(ignite, "service1");
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDeployFromClient() throws Exception {
-        Ignite ignite = ignite(NODE_CNT - 1);
+    public void testDeployAfterServerStop() throws Exception {
+        startGrid(0);
+
+        client = true;
 
-        assertTrue(ignite.configuration().isClientMode());
+        Ignite ignite = startGrid(1);
 
-        String svcName = "testService";
+        client = false;
+
+        startGrid(2);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(3);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
+     * @param client Client node.
+     * @param svcName Service name.
+     * @throws Exception If failed.
+     */
+    private void checkDeploy(Ignite client, String svcName) throws Exception {
+        assertTrue(client.configuration().isClientMode());
 
         CountDownLatch latch = new CountDownLatch(1);
 
         DummyService.exeLatch(svcName, latch);
 
-        ignite.services().deployClusterSingleton(svcName, new DummyService());
+        client.services().deployClusterSingleton(svcName, new DummyService());
 
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
     }


Mime
View raw message