ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [22/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f466bf2..e6d71aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -229,14 +230,22 @@ public class IgniteTxHandler {
             return null;
         }
 
+        IgniteTxEntry firstEntry = null;
+
         try {
-            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes()))
+            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
                 e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+                if (firstEntry == null)
+                    firstEntry = e;
+            }
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
         }
 
+        assert firstEntry != null : req;
+
         GridDhtTxLocal tx;
 
         GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
@@ -253,36 +262,88 @@ public class IgniteTxHandler {
             }
         }
         else {
-            tx = new GridDhtTxLocal(
-                ctx,
-                nearNode.id(),
-                req.version(),
-                req.futureId(),
-                req.miniId(),
-                req.threadId(),
-                req.implicitSingle(),
-                req.implicitSingle(),
-                req.system(),
-                req.explicitLock(),
-                req.policy(),
-                req.concurrency(),
-                req.isolation(),
-                req.timeout(),
-                req.isInvalidate(),
-                false,
-                req.txSize(),
-                req.transactionNodes(),
-                req.subjectId(),
-                req.taskNameHash()
-            );
+            GridDhtPartitionTopology top = null;
 
-            tx = ctx.tm().onCreated(null, tx);
+            if (req.firstClientRequest()) {
+                assert req.concurrency().equals(OPTIMISTIC) : req;
+                assert CU.clientNode(nearNode) : nearNode;
 
-            if (tx != null)
-                tx.topologyVersion(req.topologyVersion());
-            else
-                U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
-                    req.version() + ", req=" + req + ']');
+                top = firstEntry.context().topology();
+
+                top.readLock();
+            }
+
+            try {
+                if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Client topology version mismatch, need remap transaction [" +
+                            "reqTopVer=" + req.topologyVersion() +
+                            ", locTopVer=" + top.topologyVersion() +
+                            ", req=" + req + ']');
+                    }
+
+                    GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                        req.version(),
+                        req.futureId(),
+                        req.miniId(),
+                        req.version(),
+                        req.version(),
+                        null,
+                        null,
+                        null,
+                        top.topologyVersion());
+
+                    try {
+                        ctx.io().send(nearNode, res, req.policy());
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send client tx remap response, client node failed " +
+                                "[node=" + nearNode + ", req=" + req + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send client tx remap response " +
+                            "[node=" + nearNode + ", req=" + req + ']', e);
+                    }
+
+                    return new GridFinishedFuture<>(res);
+                }
+
+                tx = new GridDhtTxLocal(
+                    ctx,
+                    nearNode.id(),
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.threadId(),
+                    req.implicitSingle(),
+                    req.implicitSingle(),
+                    req.system(),
+                    req.explicitLock(),
+                    req.policy(),
+                    req.concurrency(),
+                    req.isolation(),
+                    req.timeout(),
+                    req.isInvalidate(),
+                    false,
+                    req.txSize(),
+                    req.transactionNodes(),
+                    req.subjectId(),
+                    req.taskNameHash()
+                );
+
+                tx = ctx.tm().onCreated(null, tx);
+
+                if (tx != null)
+                    tx.topologyVersion(req.topologyVersion());
+                else
+                    U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+                        req.version() + ", req=" + req + ']');
+            }
+            finally {
+                if (top != null)
+                    top.readUnlock();
+            }
         }
 
         if (tx != null) {
@@ -343,6 +404,31 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param expVer Expected topology version.
+     * @param curVer Current topology version.
+     * @param req Request.
+     * @return {@code True} if cache affinity changed and request should be remapped.
+     */
+    private boolean needRemap(AffinityTopologyVersion expVer,
+        AffinityTopologyVersion curVer,
+        GridNearTxPrepareRequest req) {
+        if (expVer.equals(curVer))
+            return false;
+
+        for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
+            GridCacheContext ctx = e.context();
+
+            Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
+            Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+
+            if (!cacheNodes0.equals(cacheNodes1))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param res Response.
      */
@@ -1024,8 +1110,10 @@ public class IgniteTxHandler {
                     return null;
                 }
             }
-            else
+            else {
+                tx.writeVersion(req.writeVersion());
                 tx.transactionNodes(req.transactionNodes());
+            }
 
             if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
                 int idx = 0;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d57786e..8b5eaec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
@@ -40,6 +40,7 @@ import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.io.*;
@@ -480,17 +481,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      */
     @SuppressWarnings({"CatchGenericClass"})
     protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
-        CacheStoreManager store = store();
+        if (!storeEnabled() || internal())
+            return;
+
+        Collection<CacheStoreManager> stores = stores();
+
+        if (stores == null || stores.isEmpty())
+            return;
+
+        assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction";
 
-        if (store != null && store.isWriteThrough() && storeEnabled() &&
-            !internal() && (near() || store.isWriteToStoreFromDht())) {
+        boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
+
+        if (near() || isWriteToStoreFromDht) {
             try {
                 if (writeEntries != null) {
                     Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
                     List<Object> rmvCol = null;
                     CacheStoreManager writeStore = null;
 
-                    boolean skipNear = near() && store.isWriteToStoreFromDht();
+                    boolean skipNear = near() && isWriteToStoreFromDht;
 
                     for (IgniteTxEntry e : writeEntries) {
                         if ((skipNear && e.cached().isNear()) || e.skipStore())
@@ -524,11 +534,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             }
 
                             // Batch-process puts if cache ID has changed.
-                            if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) {
-                                writeStore.putAll(this, putMap);
+                            if (writeStore != null && writeStore != cacheCtx.store()) {
+                                if (putMap != null && !putMap.isEmpty()) {
+                                    writeStore.putAll(this, putMap);
 
-                                // Reset.
-                                putMap.clear();
+                                    // Reset.
+                                    putMap.clear();
+                                }
 
                                 writeStore = null;
                             }
@@ -544,12 +556,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
                             }
 
-                            if (putMap == null)
-                                putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
 
-                            putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver));
+                            if (writeStore.isWriteThrough()) {
+                                if (putMap == null)
+                                    putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
 
-                            writeStore = cacheCtx.store();
+                                putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver));
+                            }
                         }
                         else if (op == DELETE) {
                             // Batch-process all puts if needed.
@@ -564,11 +579,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 writeStore = null;
                             }
 
-                            if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) {
-                                writeStore.removeAll(this, rmvCol);
+                            if (writeStore != null && writeStore != cacheCtx.store()) {
+                                if (rmvCol != null && !rmvCol.isEmpty()) {
+                                    writeStore.removeAll(this, rmvCol);
 
-                                // Reset.
-                                rmvCol.clear();
+                                    // Reset.
+                                    rmvCol.clear();
+                                }
 
                                 writeStore = null;
                             }
@@ -581,12 +598,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     continue;
                             }
 
-                            if (rmvCol == null)
-                                rmvCol = new ArrayList<>();
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
 
-                            rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false));
+                            if (writeStore.isWriteThrough()) {
+                                if (rmvCol == null)
+                                    rmvCol = new ArrayList<>();
 
-                            writeStore = cacheCtx.store();
+                                rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false));
+                            }
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignoring NOOP entry for batch store commit: " + e);
@@ -610,7 +630,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 }
 
                 // Commit while locks are held.
-                store.sessionEnd(this, true);
+                sessionEnd(stores, true);
             }
             catch (IgniteCheckedException ex) {
                 commitError(ex);
@@ -635,6 +655,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
             }
+            finally {
+                if (isRollbackOnly())
+                    sessionEnd(stores, false);
+            }
         }
     }
 
@@ -972,24 +996,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 cctx.tm().resetContext();
             }
         }
-        else {
-            CacheStoreManager store = store();
-
-            if (store != null && !internal()) {
-                try {
-                    store.sessionEnd(this, true);
-                }
-                catch (IgniteCheckedException e) {
-                    commitError(e);
-
-                    setRollbackOnly();
-
-                    cctx.tm().removeCommittedTx(this);
-
-                    throw e;
-                }
-            }
-        }
 
         // Do not unlock transaction entries if one-phase commit.
         if (!onePhaseCommit()) {
@@ -1078,11 +1084,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 cctx.tm().rollbackTx(this);
 
-                CacheStoreManager store = store();
+                if (!internal()) {
+                    Collection<CacheStoreManager> stores = stores();
+
+                    if (stores != null && !stores.isEmpty()) {
+                        assert isWriteToStoreFromDhtValid(stores) :
+                            "isWriteToStoreFromDht can't be different within one transaction";
 
-                if (store != null && (near() || store.isWriteToStoreFromDht())) {
-                    if (!internal())
-                        store.sessionEnd(this, false);
+                        boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
+
+                        if (stores != null && !stores.isEmpty() && (near() || isWriteToStoreFromDht))
+                            sessionEnd(stores, false);
+                    }
                 }
             }
             catch (Error | IgniteCheckedException | RuntimeException e) {
@@ -1094,6 +1107,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param stores Store managers.
+     * @param commit Commit flag.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
+        Iterator<CacheStoreManager> it = stores.iterator();
+
+        while (it.hasNext()) {
+            CacheStoreManager store = it.next();
+
+            store.sessionEnd(this, commit, !it.hasNext());
+        }
+    }
+
+    /**
      * Checks if there is a cached or swapped value for
      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
      *
@@ -2507,6 +2535,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
+     * @throws IgniteCheckedException If updates are not allowed.
+     */
+    private void checkUpdatesAllowed(GridCacheContext cacheCtx) throws IgniteCheckedException {
+        if (!cacheCtx.updatesAllowed()) {
+            throw new IgniteTxRollbackCheckedException(new CacheException(
+                "Updates are not allowed for transactional cache: " + cacheCtx.name() + ". Configure " +
+                "persistence store on client or use remote closure execution to start transactions " +
+                "from server nodes."));
+        }
+    }
+
+    /**
      * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
      * maps must be non-null.
      *
@@ -2533,6 +2574,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     ) {
         assert filter == null || invokeMap == null;
 
+        try {
+            checkUpdatesAllowed(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
         cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
 
         if (retval)
@@ -2753,6 +2801,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         @Nullable GridCacheEntryEx cached,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter) {
+        try {
+            checkUpdatesAllowed(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
         cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
         if (retval)
@@ -2983,7 +3038,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         // Check if we can enlist new cache to transaction.
         if (!activeCacheIds.contains(cacheId)) {
-            if (!cctx.txCompatible(this, activeCacheIds, cacheCtx)) {
+            String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx);
+
+            if (err != null) {
                 StringBuilder cacheNames = new StringBuilder();
 
                 int idx = 0;
@@ -2995,9 +3052,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         cacheNames.append(", ");
                 }
 
-                throw new IgniteCheckedException("Failed to enlist new cache to existing transaction " +
-                    "(cache configurations are not compatible) [" +
-                    "activeCaches=[" + cacheNames + "]" +
+                throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" +
+                    err +
+                    ") [activeCaches=[" + cacheNames + "]" +
                     ", cacheName=" + cacheCtx.name() +
                     ", cacheSystem=" + cacheCtx.systemTx() +
                     ", txSystem=" + system() + ']');
@@ -3284,6 +3341,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param stores Store managers.
+     * @return If {@code isWriteToStoreFromDht} value same for all stores.
+     */
+    private boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) {
+        if (stores != null && !stores.isEmpty()) {
+            boolean exp = F.first(stores).isWriteToStoreFromDht();
+
+            for (CacheStoreManager store : stores) {
+                if (store.isWriteToStoreFromDht() != exp)
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
      * Post-lock closure alias.
      *
      * @param <T> Return type.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4666cca..b6c77f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 collectPendingVersions(dhtTxLoc);
             }
 
-            // 3.1 Call dataStructures manager.
-            cctx.kernalContext().dataStructures().onTxCommitted(tx);
-
             // 4. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index f2b3ef4..dc0d1e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -30,8 +30,9 @@ import org.jetbrains.annotations.*;
 public interface IgniteCacheObjectProcessor extends GridProcessor {
     /**
      * @see GridComponent#onKernalStart()
+     * @throws IgniteCheckedException If failed.
      */
-    public void onUtilityCacheStarted();
+    public void onUtilityCacheStarted() throws IgniteCheckedException;
 
     /**
      * @param typeName Type name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 9b89d5a..3e59b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -212,7 +212,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public void onUtilityCacheStarted() {
+    @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
new file mode 100644
index 0000000..91768a6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
+    /** Routine ID. */
+    protected final UUID routineId;
+
+    /** Custom message ID. */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /**
+     * @param id Id.
+     */
+    protected AbstractContinuousMessage(UUID id) {
+        routineId = id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean incrementMinorTopologyVersion() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
index eb33613..1b79430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
@@ -23,18 +23,6 @@ import org.jetbrains.annotations.*;
  * Continuous processor message types.
  */
 enum GridContinuousMessageType {
-    /** Consume start request. */
-    MSG_START_REQ,
-
-    /** Consume start acknowledgement. */
-    MSG_START_ACK,
-
-    /** Consume stop request. */
-    MSG_STOP_REQ,
-
-    /** Consume stop acknowledgement. */
-    MSG_STOP_ACK,
-
     /** Remote event notification. */
     MSG_EVT_NOTIFICATION,
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0d76ad4..38d970b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -58,50 +59,44 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Local infos. */
     private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>();
 
+    /** Local infos. */
+    private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap8<>();
+
     /** Remote infos. */
     private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>();
 
     /** Start futures. */
     private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>();
 
-    /** Start ack wait lists. */
-    private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>();
-
     /** Stop futures. */
     private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>();
 
-    /** Stop ack wait lists. */
-    private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>();
-
     /** Threads started by this processor. */
     private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>();
 
-    /** Pending start requests. */
-    private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>();
-
     /** */
     private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
 
     /** Stopped IDs. */
     private final Collection<UUID> stopped = new HashSet<>();
 
-    /** Lock for pending requests. */
-    private final Lock pendingLock = new ReentrantLock();
-
     /** Lock for stop process. */
     private final Lock stopLock = new ReentrantLock();
 
+    /** Marshaller. */
+    private Marshaller marsh;
+
     /** Delay in milliseconds between retries. */
     private long retryDelay = 1000;
 
     /** Number of retries using to send messages. */
     private int retryCnt = 3;
 
-    /** Acknowledgement timeout. */
-    private long ackTimeout;
+    /** */
+    private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock();
 
-    /** Marshaller. */
-    private Marshaller marsh;
+    /** */
+    private boolean processorStopped;
 
     /**
      * @param ctx Kernal context.
@@ -117,15 +112,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         retryDelay = ctx.config().getNetworkSendRetryDelay();
         retryCnt = ctx.config().getNetworkSendRetryCount();
-        ackTimeout = ctx.config().getNetworkTimeout();
-
-        if (ackTimeout < retryDelay * retryCnt) {
-            U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " +
-                "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" +
-                ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']');
-
-            ackTimeout = retryDelay * retryCnt;
-        }
 
         marsh = ctx.config().getMarshaller();
 
@@ -133,114 +119,112 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             @SuppressWarnings({"fallthrough", "TooBroadScope"})
             @Override public void onEvent(Event evt) {
                 assert evt instanceof DiscoveryEvent;
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
 
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                Collection<GridContinuousMessage> reqs;
+                clientInfos.remove(nodeId);
 
-                pendingLock.lock();
+                // Unregister handlers created by left node.
+                for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+                    UUID routineId = e.getKey();
+                    RemoteRoutineInfo info = e.getValue();
 
-                try {
-                    // Remove pending requests to send to joined node
-                    // (if node is left or failed, they are dropped).
-                    reqs = pending.remove(nodeId);
-                }
-                finally {
-                    pendingLock.unlock();
+                    if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
+                        unregisterRemote(routineId);
                 }
 
-                switch (evt.type()) {
-                    case EVT_NODE_JOINED:
-                        if (reqs != null) {
-                            UUID routineId = null;
+                for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
+                    SyncMessageAckFuture fut = e.getValue();
 
-                            // Send pending requests.
-                            try {
-                                for (GridContinuousMessage req : reqs) {
-                                    routineId = req.routineId();
+                    if (fut.nodeId().equals(nodeId)) {
+                        SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
 
-                                    sendWithRetries(nodeId, req, null);
-                                }
-                            }
-                            catch (ClusterTopologyCheckedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to send pending start request to node (is node alive?): " +
-                                        nodeId);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to send pending start request to node: " + nodeId, e);
+                        if (fut0 != null) {
+                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                "Node left grid while sending message to: " + nodeId);
 
-                                completeStartFuture(routineId);
-                            }
+                            fut0.onDone(err);
                         }
+                    }
+                }
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-                        break;
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
+                    StartFuture fut = itr.next();
 
-                    case EVT_NODE_LEFT:
-                    case EVT_NODE_FAILED:
-                        // Do not wait for start acknowledgements from left node.
-                        for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) {
-                            Collection<UUID> nodeIds = e.getValue();
+                    itr.remove();
 
-                            for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
-                                if (nodeId.equals(it.next())) {
-                                    it.remove();
+                    fut.onDone(new IgniteException("Topology segmented"));
+                }
 
-                                    break;
-                                }
-                            }
+                for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
+                    StopFuture fut = itr.next();
 
-                            if (nodeIds.isEmpty())
-                                completeStartFuture(e.getKey());
-                        }
+                    itr.remove();
 
-                        // Do not wait for stop acknowledgements from left node.
-                        for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) {
-                            Collection<UUID> nodeIds = e.getValue();
+                    fut.onDone(new IgniteException("Topology segmented"));
+                }
+            }
+        }, EVT_NODE_SEGMENTED);
 
-                            for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
-                                if (nodeId.equals(it.next())) {
-                                    it.remove();
+        ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
+            new CustomEventListener<StartRoutineDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) {
+                    if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
+                        processStartRequest(snd, msg);
+                }
+            });
 
-                                    break;
-                                }
-                            }
+        ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
+            new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) {
+                    StartFuture fut = startFuts.remove(msg.routineId());
 
-                            if (nodeIds.isEmpty())
-                                completeStopFuture(e.getKey());
-                        }
+                    if (fut != null) {
+                        if (msg.errs().isEmpty())
+                            fut.onRemoteRegistered();
+                        else {
+                            IgniteCheckedException firstEx = F.first(msg.errs().values());
 
-                        // Unregister handlers created by left node.
-                        for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
-                            UUID routineId = e.getKey();
-                            RemoteRoutineInfo info = e.getValue();
+                            fut.onDone(firstEx);
 
-                            if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
-                                unregisterRemote(routineId);
+                            stopRoutine(msg.routineId());
                         }
+                    }
+                }
+            });
 
-                        for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
-                            SyncMessageAckFuture fut = e.getValue();
+        ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
+            new CustomEventListener<StopRoutineDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+                    if (!snd.id().equals(ctx.localNodeId())) {
+                        UUID routineId = msg.routineId();
 
-                            if (fut.nodeId().equals(nodeId)) {
-                                SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+                        unregisterRemote(routineId);
 
-                                if (fut0 != null) {
-                                    ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
-                                        "Node left grid while sending message to: " + nodeId);
+                        if (snd.isClient()) {
+                            Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id());
 
-                                    fut0.onDone(err);
-                                }
-                            }
+                            if (infoMap != null)
+                                infoMap.remove(msg.routineId());
                         }
+                    }
+                }
+            });
 
-                        break;
+        ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
+            new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) {
+                    StopFuture fut = stopFuts.remove(msg.routineId());
 
-                    default:
-                        assert false : "Unexpected event received: " + evt.shortDisplay();
+                    if (fut != null)
+                        fut.onDone();
                 }
-            }
-        }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+            });
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object obj) {
@@ -258,26 +242,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 }
 
                 switch (msg.type()) {
-                    case MSG_START_REQ:
-                        processStartRequest(nodeId, msg);
-
-                        break;
-
-                    case MSG_START_ACK:
-                        processStartAck(nodeId, msg);
-
-                        break;
-
-                    case MSG_STOP_REQ:
-                        processStopRequest(nodeId, msg);
-
-                        break;
-
-                    case MSG_STOP_ACK:
-                        processStopAck(nodeId, msg);
-
-                        break;
-
                     case MSG_EVT_NOTIFICATION:
                         processNotification(nodeId, msg);
 
@@ -298,6 +262,41 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             log.debug("Continuous processor started.");
     }
 
+    /**
+     * @return {@code true} if lock successful, {@code false} if processor already stopped.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    public boolean lockStopping() {
+        processorStopLock.readLock().lock();
+
+        if (processorStopped) {
+            processorStopLock.readLock().unlock();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     */
+    public void unlockStopping() {
+        processorStopLock.readLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        processorStopLock.writeLock().lock();
+
+        try {
+            processorStopped = true;
+        }
+        finally {
+            processorStopLock.writeLock().unlock();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
         if (ctx.config().isDaemon())
@@ -320,29 +319,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
         if (!nodeId.equals(ctx.localNodeId())) {
-            pendingLock.lock();
+            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
 
-            try {
-                // Create empty pending set.
-                pending.put(nodeId, new HashSet<GridContinuousMessage>());
-
-                DiscoveryData data = new DiscoveryData(ctx.localNodeId());
-
-                // Collect listeners information (will be sent to
-                // joining node during discovery process).
-                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
-                    UUID routineId = e.getKey();
-                    LocalRoutineInfo info = e.getValue();
-
-                    data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
-                        info.hnd, info.bufSize, info.interval));
-                }
+            // Collect listeners information (will be sent to
+            // joining node during discovery process).
+            for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
+                UUID routineId = e.getKey();
+                LocalRoutineInfo info = e.getValue();
 
-                return data;
-            }
-            finally {
-                pendingLock.unlock();
+                data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
+                    info.hnd, info.bufSize, info.interval));
             }
+
+            return data;
         }
         else
             return null;
@@ -366,6 +355,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     }
                 }
             }
+
+            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+                Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
+
+                if (map == null) {
+                    map = new HashMap<>();
+
+                    clientInfos.put(entry.getKey(), map);
+                }
+
+                map.putAll(entry.getValue());
+            }
         }
     }
 
@@ -447,14 +448,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     if (dep == null)
                         throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred);
 
-                    reqData.clsName = clsName;
-                    reqData.depInfo = new GridDeploymentInfoBean(dep);
+                    reqData.className(clsName);
+                    reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
 
                     reqData.p2pMarshal(marsh);
                 }
 
                 // Handle peer deployment for other handler-specific objects.
-                reqData.hnd.p2pMarshal(ctx);
+                reqData.handler().p2pMarshal(ctx);
             }
         }
         catch (IgniteCheckedException e) {
@@ -486,103 +487,24 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
         }
 
-        Collection<? extends ClusterNode> nodes;
-        Collection<UUID> nodeIds;
-
-        pendingLock.lock();
-
-        try {
-            // Nodes that participate in routine (request will be sent to these nodes directly).
-            nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId())));
-
-            // Stop with exception if projection is empty.
-            if (nodes.isEmpty() && !locIncluded) {
-                return new GridFinishedFuture<>(
-                    new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty)."));
-            }
-
-            // IDs of nodes where request will be sent.
-            nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()));
-
-            // If there are currently joining nodes, add request to their pending lists.
-            // Node IDs set is updated to make sure that we wait for acknowledgement from
-            // these nodes.
-            for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) {
-                if (nodeIds.add(e.getKey()))
-                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false));
-            }
-
-            // Register routine locally.
-            locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
-        }
-        finally {
-            pendingLock.unlock();
-        }
+        // Register routine locally.
+        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
-        if (!nodeIds.isEmpty()) {
-            // Wait for acknowledgements.
-            waitForStartAck.put(routineId, nodeIds);
-
-            startFuts.put(routineId, fut);
-
-            // Register acknowledge timeout (timeout object will be removed when
-            // future is completed).
-            fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) {
-                @Override public void onTimeout() {
-                    // Stop waiting for acknowledgements.
-                    Collection<UUID> ids = waitForStartAck.remove(routineId);
+        startFuts.put(routineId, fut);
 
-                    if (ids != null) {
-                        StartFuture f = startFuts.remove(routineId);
-
-                        assert f != null;
-
-                        // If there are still nodes without acknowledgements,
-                        // Stop routine with exception. Continue and complete
-                        // future otherwise.
-                        if (!ids.isEmpty()) {
-                            f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " +
-                                "expired): " + ids + ". Will unregister all continuous listeners."));
-
-                            stopRoutine(routineId);
-                        }
-                        else
-                            f.onRemoteRegistered();
-                    }
-                }
-            });
+        try {
+            ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
         }
+        catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+            startFuts.remove(routineId);
 
-        if (!nodes.isEmpty()) {
-            // Do not send projection predicate (nodes already filtered).
-            reqData.prjPred = null;
-            reqData.prjPredBytes = null;
-
-            // Send start requests.
-            try {
-                GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false);
+            locInfos.remove(routineId);
 
-                sendWithRetries(nodes, req, null);
-            }
-            catch (IgniteCheckedException e) {
-                startFuts.remove(routineId);
-                waitForStartAck.remove(routineId);
+            fut.onDone(e);
 
-                fut.onDone(e);
-
-                stopRoutine(routineId);
-
-                locIncluded = false;
-            }
-        }
-        else {
-            // There are no remote nodes, but we didn't throw topology exception.
-            assert locIncluded;
-
-            // Do not wait anything from remote nodes.
-            fut.onRemoteRegistered();
+            return fut;
         }
 
         // Register local handler if needed.
@@ -640,61 +562,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             // Unregister handler locally.
             unregisterHandler(routineId, routine.hnd, true);
 
-            pendingLock.lock();
-
-            try {
-                // Remove pending requests for this routine.
-                for (Collection<GridContinuousMessage> msgs : pending.values()) {
-                    Iterator<GridContinuousMessage> it = msgs.iterator();
-
-                    while (it.hasNext()) {
-                        if (it.next().routineId().equals(routineId))
-                            it.remove();
-                    }
-                }
-            }
-            finally {
-                pendingLock.unlock();
-            }
-
-            // Nodes where to send stop requests.
-            Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(),
-                F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId())));
-
-            if (!nodes.isEmpty()) {
-                // Wait for acknowledgements.
-                waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())));
-
-                // Register acknowledge timeout (timeout object will be removed when
-                // future is completed).
-                fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId,
-                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false)));
-
-                // Send stop requests.
-                try {
-                    for (ClusterNode node : nodes) {
-                        try {
-                            sendWithRetries(node.id(),
-                                new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false),
-                                null);
-                        }
-                        catch (ClusterTopologyCheckedException ignored) {
-                            U.warn(log, "Failed to send stop request (node left topology): " + node.id());
-                        }
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    stopFuts.remove(routineId);
-                    waitForStopAck.remove(routineId);
-
-                    fut.onDone(e);
-                }
-            }
-            else {
-                stopFuts.remove(routineId);
+            ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
 
+            if (ctx.isStopping())
                 fut.onDone();
-            }
         }
 
         return fut;
@@ -721,13 +592,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         assert !nodeId.equals(ctx.localNodeId());
 
+        if (processorStopped)
+            return;
+
         RemoteRoutineInfo info = rmtInfos.get(routineId);
 
         if (info != null) {
             assert info.interval == 0 || !sync;
 
             if (sync) {
-                SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId);
+                SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId);
 
                 IgniteUuid futId = IgniteUuid.randomUuid();
 
@@ -779,29 +653,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param nodeId Sender ID.
+     * @param node Sender.
      * @param req Start request.
      */
-    private void processStartRequest(UUID nodeId, GridContinuousMessage req) {
-        assert nodeId != null;
-        assert req != null;
-
+    private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
         UUID routineId = req.routineId();
-        StartRequestData data = req.data();
+        StartRequestData data = req.startRequestData();
 
-        GridContinuousHandler hnd = data.hnd;
+        GridContinuousHandler hnd = data.handler();
 
         IgniteCheckedException err = null;
 
         try {
             if (ctx.config().isPeerClassLoadingEnabled()) {
-                String clsName = data.clsName;
+                String clsName = data.className();
 
                 if (clsName != null) {
-                    GridDeploymentInfo depInfo = data.depInfo;
+                    GridDeploymentInfo depInfo = data.deploymentInfo();
 
                     GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                        depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+                        depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null);
 
                     if (dep == null)
                         throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
@@ -809,127 +680,58 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     data.p2pUnmarshal(marsh, dep.classLoader());
                 }
 
-                hnd.p2pUnmarshal(nodeId, ctx);
+                hnd.p2pUnmarshal(node.id(), ctx);
             }
         }
         catch (IgniteCheckedException e) {
             err = e;
 
-            U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e);
+            U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
+        }
+
+        if (node.isClient()) {
+            Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(node.id());
+
+            if (clientRouteMap == null) {
+                clientRouteMap = new HashMap<>();
+
+                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRouteMap);
+
+                assert old == null;
+            }
+
+            clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(),
+                data.interval()));
         }
 
         boolean registered = false;
 
         if (err == null) {
             try {
-                IgnitePredicate<ClusterNode> prjPred = data.prjPred;
+                IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
+
+                ctx.resource().injectGeneric(prjPred);
 
                 if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
-                    registered = registerHandler(nodeId, routineId, hnd, data.bufSize, data.interval,
-                        data.autoUnsubscribe, false);
+                    registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+                        data.autoUnsubscribe(), false);
                 }
             }
             catch (IgniteCheckedException e) {
                 err = e;
 
-                U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e);
+                U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
             }
         }
 
-        try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null);
-        }
-        catch (ClusterTopologyCheckedException ignored) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e);
-        }
+        if (err != null)
+            req.addError(ctx.localNodeId(), err);
 
         if (registered)
             hnd.onListenerRegistered(routineId, ctx);
     }
 
     /**
-     * @param nodeId Sender ID.
-     * @param ack Start acknowledgement.
-     */
-    private void processStartAck(UUID nodeId, GridContinuousMessage ack) {
-        assert nodeId != null;
-        assert ack != null;
-
-        UUID routineId = ack.routineId();
-
-        final IgniteCheckedException err = ack.data();
-
-        if (err != null) {
-            if (waitForStartAck.remove(routineId) != null) {
-                final StartFuture fut = startFuts.remove(routineId);
-
-                if (fut != null) {
-                    fut.onDone(err);
-
-                    stopRoutine(routineId);
-                }
-            }
-        }
-
-        Collection<UUID> nodeIds = waitForStartAck.get(routineId);
-
-        if (nodeIds != null) {
-            nodeIds.remove(nodeId);
-
-            if (nodeIds.isEmpty())
-                completeStartFuture(routineId);
-        }
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Stop request.
-     */
-    private void processStopRequest(UUID nodeId, GridContinuousMessage req) {
-        assert nodeId != null;
-        assert req != null;
-
-        UUID routineId = req.routineId();
-
-        unregisterRemote(routineId);
-
-        try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null);
-        }
-        catch (ClusterTopologyCheckedException ignored) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e);
-        }
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param ack Stop acknowledgement.
-     */
-    private void processStopAck(UUID nodeId, GridContinuousMessage ack) {
-        assert nodeId != null;
-        assert ack != null;
-
-        UUID routineId = ack.routineId();
-
-        Collection<UUID> nodeIds = waitForStopAck.get(routineId);
-
-        if (nodeIds != null) {
-            nodeIds.remove(nodeId);
-
-            if (nodeIds.isEmpty())
-                completeStopFuture(routineId);
-        }
-    }
-
-    /**
      * @param msg Message.
      */
     private void processMessageAck(GridContinuousMessage msg) {
@@ -972,36 +774,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param routineId Consume ID.
-     */
-    private void completeStartFuture(UUID routineId) {
-        assert routineId != null;
-
-        if (waitForStartAck.remove(routineId) != null) {
-            StartFuture fut = startFuts.remove(routineId);
-
-            assert fut != null;
-
-            fut.onRemoteRegistered();
-        }
-    }
-
-    /**
-     * @param routineId Consume ID.
-     */
-    private void completeStopFuture(UUID routineId) {
-        assert routineId != null;
-
-        if (waitForStopAck.remove(routineId) != null) {
-            GridFutureAdapter <?> fut = stopFuts.remove(routineId);
-
-            assert fut != null;
-
-            fut.onDone();
-        }
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param routineId Consume ID.
      * @param hnd Handler.
@@ -1231,7 +1003,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * Local routine info.
      */
     @SuppressWarnings("PackageVisibleInnerClass")
-    static class LocalRoutineInfo {
+    static class LocalRoutineInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Projection predicate. */
         private final IgnitePredicate<ClusterNode> prjPred;
 
@@ -1430,133 +1205,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Start request data.
-     */
-    private static class StartRequestData implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Projection predicate. */
-        private IgnitePredicate<ClusterNode> prjPred;
-
-        /** Serialized projection predicate. */
-        private byte[] prjPredBytes;
-
-        /** Deployment class name. */
-        private String clsName;
-
-        /** Deployment info. */
-        private GridDeploymentInfo depInfo;
-
-        /** Handler. */
-        private GridContinuousHandler hnd;
-
-        /** Buffer size. */
-        private int bufSize;
-
-        /** Time interval. */
-        private long interval;
-
-        /** Automatic unsubscribe flag. */
-        private boolean autoUnsubscribe;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public StartRequestData() {
-            // No-op.
-        }
-
-        /**
-         * @param prjPred Serialized projection predicate.
-         * @param hnd Handler.
-         * @param bufSize Buffer size.
-         * @param interval Time interval.
-         * @param autoUnsubscribe Automatic unsubscribe flag.
-         */
-        StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd,
-            int bufSize, long interval, boolean autoUnsubscribe) {
-            assert hnd != null;
-            assert bufSize > 0;
-            assert interval >= 0;
-
-            this.prjPred = prjPred;
-            this.hnd = hnd;
-            this.bufSize = bufSize;
-            this.interval = interval;
-            this.autoUnsubscribe = autoUnsubscribe;
-        }
-
-        /**
-         * @param marsh Marshaller.
-         * @throws IgniteCheckedException In case of error.
-         */
-        void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
-            assert marsh != null;
-
-            prjPredBytes = marsh.marshal(prjPred);
-        }
-
-        /**
-         * @param marsh Marshaller.
-         * @param ldr Class loader.
-         * @throws IgniteCheckedException In case of error.
-         */
-        void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-            assert marsh != null;
-
-            assert prjPred == null;
-            assert prjPredBytes != null;
-
-            prjPred = marsh.unmarshal(prjPredBytes, ldr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            boolean b = prjPredBytes != null;
-
-            out.writeBoolean(b);
-
-            if (b) {
-                U.writeByteArray(out, prjPredBytes);
-                U.writeString(out, clsName);
-                out.writeObject(depInfo);
-            }
-            else
-                out.writeObject(prjPred);
-
-            out.writeObject(hnd);
-            out.writeInt(bufSize);
-            out.writeLong(interval);
-            out.writeBoolean(autoUnsubscribe);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            boolean b = in.readBoolean();
-
-            if (b) {
-                prjPredBytes = U.readByteArray(in);
-                clsName = U.readString(in);
-                depInfo = (GridDeploymentInfo)in.readObject();
-            }
-            else
-                prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
-
-            hnd = (GridContinuousHandler)in.readObject();
-            bufSize = in.readInt();
-            interval = in.readLong();
-            autoUnsubscribe = in.readBoolean();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StartRequestData.class, this);
-        }
-    }
-
-    /**
      * Discovery data.
      */
     private static class DiscoveryData implements Externalizable {
@@ -1570,6 +1218,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @GridToStringInclude
         private Collection<DiscoveryDataItem> items;
 
+        private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
+
         /**
          * Required by {@link Externalizable}.
          */
@@ -1580,11 +1230,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         /**
          * @param nodeId Node ID.
          */
-        DiscoveryData(UUID nodeId) {
+        DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
             assert nodeId != null;
 
             this.nodeId = nodeId;
 
+            this.clientInfos = clientInfos;
+
             items = new ArrayList<>();
         }
 
@@ -1599,12 +1251,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeUuid(out, nodeId);
             U.writeCollection(out, items);
+            U.writeMap(out, clientInfos);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             nodeId = U.readUuid(in);
             items = U.readCollection(in);
+            clientInfos = U.readMap(in);
         }
 
         /** {@inheritDoc} */
@@ -1716,13 +1370,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         private volatile GridTimeoutObject timeoutObj;
 
         /**
-         * Required by {@link Externalizable}.
-         */
-        public StartFuture() {
-            // No-op.
-        }
-
-        /**
          * @param ctx Kernal context.
          * @param routineId Consume ID.
          */
@@ -1833,10 +1480,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         private UUID nodeId;
 
         /**
-         * @param ctx Kernal context.
          * @param nodeId Master node ID.
          */
-        SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+        SyncMessageAckFuture(UUID nodeId) {
             this.nodeId = nodeId;
         }
 
@@ -1852,76 +1498,4 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             return S.toString(SyncMessageAckFuture.class, this);
         }
     }
-
-    /**
-     * Timeout object for stop process.
-     */
-    private class StopTimeoutObject extends GridTimeoutObjectAdapter {
-        /** Timeout. */
-        private final long timeout;
-
-        /** Routine ID. */
-        private final UUID routineId;
-
-        /** Request. */
-        private final GridContinuousMessage req;
-
-        /**
-         * @param timeout Timeout.
-         * @param routineId Routine ID.
-         * @param req Request.
-         */
-        protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) {
-            super(timeout);
-
-            assert routineId != null;
-            assert req != null;
-
-            this.timeout = timeout;
-            this.routineId = routineId;
-            this.req = req;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            Collection<UUID> ids = waitForStopAck.remove(routineId);
-
-            if (ids != null) {
-                U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids +
-                    ". Will retry.");
-
-                StopFuture f = stopFuts.get(routineId);
-
-                if (f != null) {
-                    if (!ids.isEmpty()) {
-                        waitForStopAck.put(routineId, ids);
-
-                        // Resend requests.
-                        for (UUID id : ids) {
-                            try {
-                                sendWithRetries(id, req, null);
-                            }
-                            catch (ClusterTopologyCheckedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to resend stop request to node (is node alive?): " + id);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to resend stop request to node: " + id, e);
-
-                                ids.remove(id);
-
-                                if (ids.isEmpty())
-                                    f.onDone(e);
-                            }
-                        }
-
-                        // Reschedule timeout.
-                        ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req));
-                    }
-                    else if (stopFuts.remove(routineId) != null)
-                        f.onDone();
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
new file mode 100644
index 0000000..c721d44
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Start request data.
+ */
+class StartRequestData implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Projection predicate. */
+    private IgnitePredicate<ClusterNode> prjPred;
+
+    /** Serialized projection predicate. */
+    private byte[] prjPredBytes;
+
+    /** Deployment class name. */
+    private String clsName;
+
+    /** Deployment info. */
+    private GridDeploymentInfo depInfo;
+
+    /** Handler. */
+    private GridContinuousHandler hnd;
+
+    /** Buffer size. */
+    private int bufSize;
+
+    /** Time interval. */
+    private long interval;
+
+    /** Automatic unsubscribe flag. */
+    private boolean autoUnsubscribe;
+
+    /**
+     * Required by {@link java.io.Externalizable}.
+     */
+    public StartRequestData() {
+        // No-op.
+    }
+
+    /**
+     * @param prjPred Serialized projection predicate.
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     */
+    StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd,
+        int bufSize, long interval, boolean autoUnsubscribe) {
+        assert hnd != null;
+        assert bufSize > 0;
+        assert interval >= 0;
+
+        this.prjPred = prjPred;
+        this.hnd = hnd;
+        this.bufSize = bufSize;
+        this.interval = interval;
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
+        assert marsh != null;
+
+        prjPredBytes = marsh.marshal(prjPred);
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        assert marsh != null;
+
+        assert prjPred == null;
+        assert prjPredBytes != null;
+
+        prjPred = marsh.unmarshal(prjPredBytes, ldr);
+    }
+
+    /**
+     * @return Projection predicate.
+     */
+    public IgnitePredicate<ClusterNode> projectionPredicate() {
+        return prjPred;
+    }
+
+    /**
+     * @param prjPred New projection predicate.
+     */
+    public void projectionPredicate(IgnitePredicate<ClusterNode> prjPred) {
+        this.prjPred = prjPred;
+    }
+
+    /**
+     * @return Serialized projection predicate.
+     */
+    public byte[] projectionPredicateBytes() {
+        return prjPredBytes;
+    }
+
+    /**
+     * @param prjPredBytes New serialized projection predicate.
+     */
+    public void projectionPredicateBytes(byte[] prjPredBytes) {
+        this.prjPredBytes = prjPredBytes;
+    }
+
+    /**
+     * @return Deployment class name.
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @param clsName New deployment class name.
+     */
+    public void className(String clsName) {
+        this.clsName = clsName;
+    }
+
+    /**
+     * @return Deployment info.
+     */
+    public GridDeploymentInfo deploymentInfo() {
+        return depInfo;
+    }
+
+    /**
+     * @param depInfo New deployment info.
+     */
+    public void deploymentInfo(GridDeploymentInfo depInfo) {
+        this.depInfo = depInfo;
+    }
+
+    /**
+     * @return Handler.
+     */
+    public GridContinuousHandler handler() {
+        return hnd;
+    }
+
+    /**
+     * @param hnd New handler.
+     */
+    public void handler(GridContinuousHandler hnd) {
+        this.hnd = hnd;
+    }
+
+    /**
+     * @return Buffer size.
+     */
+    public int bufferSize() {
+        return bufSize;
+    }
+
+    /**
+     * @param bufSize New buffer size.
+     */
+    public void bufferSize(int bufSize) {
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * @return Time interval.
+     */
+    public long interval() {
+        return interval;
+    }
+
+    /**
+     * @param interval New time interval.
+     */
+    public void interval(long interval) {
+        this.interval = interval;
+    }
+
+    /**
+     * @return Automatic unsubscribe flag.
+     */
+    public boolean autoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * @param autoUnsubscribe New automatic unsubscribe flag.
+     */
+    public void autoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        boolean b = prjPredBytes != null;
+
+        out.writeBoolean(b);
+
+        if (b) {
+            U.writeByteArray(out, prjPredBytes);
+            U.writeString(out, clsName);
+            out.writeObject(depInfo);
+        }
+        else
+            out.writeObject(prjPred);
+
+        out.writeObject(hnd);
+        out.writeInt(bufSize);
+        out.writeLong(interval);
+        out.writeBoolean(autoUnsubscribe);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        boolean b = in.readBoolean();
+
+        if (b) {
+            prjPredBytes = U.readByteArray(in);
+            clsName = U.readString(in);
+            depInfo = (GridDeploymentInfo)in.readObject();
+        }
+        else
+            prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
+
+        hnd = (GridContinuousHandler)in.readObject();
+        bufSize = in.readInt();
+        interval = in.readLong();
+        autoUnsubscribe = in.readBoolean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRequestData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..5743dd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Map<UUID, IgniteCheckedException> errs;
+
+    /**
+     * @param routineId Routine id.
+     * @param errs Errs.
+     */
+    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+        super(routineId);
+
+        this.errs = new HashMap<>(errs);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /**
+     * @return Errs.
+     */
+    public Map<UUID, IgniteCheckedException> errs() {
+        return errs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..cff6239
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final StartRequestData startReqData;
+
+    /** */
+    private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+
+    /**
+     * @param routineId Routine id.
+     * @param startReqData Start request data.
+     */
+    public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
+        super(routineId);
+
+        this.startReqData = startReqData;
+    }
+
+    /**
+     * @return Start request data.
+     */
+    public StartRequestData startRequestData() {
+        return startReqData;
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param e Exception.
+     */
+    public void addError(UUID nodeId, IgniteCheckedException e) {
+        errs.put(nodeId, e);
+    }
+
+    /**
+     * @return Errs.
+     */
+    public Map<UUID, IgniteCheckedException> errs() {
+        return errs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryCustomMessage ackMessage() {
+        return new StartRoutineAckDiscoveryMessage(routineId, errs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..256791a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param routineId Routine id.
+     */
+    public StopRoutineAckDiscoveryMessage(UUID routineId) {
+        super(routineId);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
+    }
+}



Mime
View raw message