ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [02/23] ignite git commit: ignite-5145 Support multiple service deployment in API
Date Wed, 06 Sep 2017 14:28:18 GMT
ignite-5145 Support multiple service deployment in API


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b6da976
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b6da976
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b6da976

Branch: refs/heads/ignite-5896
Commit: 0b6da9766ea5d3a096bbbf63de5ebbbe3a883677
Parents: e791254
Author: Denis Mekhanikov <dmekhanikov@gmail.com>
Authored: Tue Sep 5 12:05:50 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Sep 5 12:05:50 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteServices.java  | 112 ++-
 .../ignite/internal/IgniteServicesImpl.java     |  64 +-
 .../discovery/GridDiscoveryManager.java         |   5 -
 .../GridServiceDeploymentCompoundFuture.java    | 196 +++++
 .../service/GridServiceProcessor.java           | 555 +++++++++-----
 .../service/PreparedConfigurations.java         |  53 ++
 .../service/ServiceDeploymentException.java     |  78 ++
 .../util/future/GridCompoundFuture.java         |  15 +-
 ...ServiceDeploymentCompoundFutureSelfTest.java | 241 ++++++
 ...GridServiceProcessorBatchDeploySelfTest.java | 741 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   4 +
 11 files changed, 1821 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
index 1c01598..271adbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
@@ -20,6 +20,7 @@ package org.apache.ignite;
 import java.util.Collection;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.service.ServiceDeploymentException;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteFuture;
@@ -156,10 +157,10 @@ public interface IgniteServices extends IgniteAsyncSupport {
      *
      * @param name Service name.
      * @param svc Service instance.
-     * @throws IgniteException If failed to deploy service.
+     * @throws ServiceDeploymentException If failed to deploy service.
      */
     @IgniteAsyncSupported
-    public void deployClusterSingleton(String name, Service svc) throws IgniteException;
+    public void deployClusterSingleton(String name, Service svc) throws ServiceDeploymentException;
 
     /**
      * Asynchronously deploys a cluster-wide singleton service. Ignite will guarantee that there is always
@@ -178,9 +179,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * @param name Service name.
      * @param svc Service instance.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to deploy service.
      */
-    public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException;
+    public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc);
 
     /**
      * Deploys a per-node singleton service. Ignite will guarantee that there is always
@@ -194,10 +194,10 @@ public interface IgniteServices extends IgniteAsyncSupport {
      *
      * @param name Service name.
      * @param svc Service instance.
-     * @throws IgniteException If failed to deploy service.
+     * @throws ServiceDeploymentException If failed to deploy service.
      */
     @IgniteAsyncSupported
-    public void deployNodeSingleton(String name, Service svc) throws IgniteException;
+    public void deployNodeSingleton(String name, Service svc) throws ServiceDeploymentException;
 
     /**
      * Asynchronously deploys a per-node singleton service. Ignite will guarantee that there is always
@@ -212,9 +212,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * @param name Service name.
      * @param svc Service instance.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to deploy service.
      */
-    public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException;
+    public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc);
 
     /**
      * Deploys one instance of this service on the primary node for a given affinity key.
@@ -245,11 +244,11 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * @param cacheName Name of the cache on which affinity for key should be calculated, {@code null} for
      *      default cache.
      * @param affKey Affinity cache key.
-     * @throws IgniteException If failed to deploy service.
+     * @throws ServiceDeploymentException If failed to deploy service.
      */
     @IgniteAsyncSupported
     public void deployKeyAffinitySingleton(String name, Service svc, @Nullable String cacheName, Object affKey)
-        throws IgniteException;
+        throws ServiceDeploymentException;
 
     /**
      * Asynchronously deploys one instance of this service on the primary node for a given affinity key.
@@ -281,10 +280,9 @@ public interface IgniteServices extends IgniteAsyncSupport {
      *      default cache.
      * @param affKey Affinity cache key.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to deploy service.
      */
     public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, @Nullable String cacheName,
-        Object affKey) throws IgniteException;
+        Object affKey);
 
     /**
      * Deploys multiple instances of the service on the grid. Ignite will deploy a
@@ -314,10 +312,11 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * @param svc Service instance.
      * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited.
      * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited.
-     * @throws IgniteException If failed to deploy service.
+     * @throws ServiceDeploymentException If failed to deploy service.
      */
     @IgniteAsyncSupported
-    public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) throws IgniteException;
+    public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt)
+        throws ServiceDeploymentException;
 
     /**
      * Asynchronously deploys multiple instances of the service on the grid. Ignite will deploy a
@@ -348,10 +347,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited.
      * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to deploy service.
      */
-    public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt)
-        throws IgniteException;
+    public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt);
 
     /**
      * Deploys multiple instances of the service on the grid according to provided
@@ -390,10 +387,10 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * </pre>
      *
      * @param cfg Service configuration.
-     * @throws IgniteException If failed to deploy service.
+     * @throws ServiceDeploymentException If failed to deploy service.
      */
     @IgniteAsyncSupported
-    public void deploy(ServiceConfiguration cfg) throws IgniteException;
+    public void deploy(ServiceConfiguration cfg) throws ServiceDeploymentException;
 
     /**
      * Asynchronously deploys multiple instances of the service on the grid according to provided
@@ -433,9 +430,51 @@ public interface IgniteServices extends IgniteAsyncSupport {
      *
      * @param cfg Service configuration.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to deploy service.
      */
-    public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException;
+    public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg);
+
+    /**
+     * Deploys multiple services described by provided configurations. Depending on specified parameters, multiple
+     * instances of the same service may be deployed (see {@link ServiceConfiguration}).
+     * Whenever topology changes, Ignite will automatically rebalance
+     * the deployed services within cluster to make sure that each node will end up with
+     * about equal number of deployed instances whenever possible.
+     *
+     * If deployment fails, then {@link ServiceDeploymentException} containing a list of failed services will be
+     * thrown. It is guaranteed that all services that were provided to this method and are not present in the list of
+     * failed services are successfully deployed by the moment of the exception being thrown.
+     *
+     * @param cfgs {@link Collection} of service configurations to be deployed.
+     * @param allOrNone Specifies behavior in case when errors during deployment occur. If {@code true}, then two
+     * outcomes are possible: either all services will be deployed, or none of them. If {@code false}, then partial
+     * deployments are permitted.
+     * @throws ServiceDeploymentException If failed to deploy services.
+     * @see IgniteServices#deploy(ServiceConfiguration)
+     * @see IgniteServices#deployAllAsync(Collection, boolean)
+     */
+    public void deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) throws ServiceDeploymentException;
+
+    /**
+     * Asynchronously deploys multiple services described by provided configurations. Depending on specified parameters,
+     * multiple instances of the same service may be deployed (see {@link ServiceConfiguration}).
+     * Whenever topology changes, Ignite will automatically rebalance
+     * the deployed services within cluster to make sure that each node will end up with
+     * about equal number of deployed instances whenever possible.
+     *
+     * If deployment fails, then {@link ServiceDeploymentException} containing a list of failed services will be
+     * thrown from {@link IgniteFuture#get get()} method of the returned future. It is guaranteed that all services,
+     * that were provided to this method and are not present in the list of failed services, are successfully deployed
+     * by the moment of the exception being thrown.
+     *
+     * @param cfgs {@link Collection} of service configurations to be deployed.
+     * @param allOrNone Specifies behavior in case when errors during deployment occur. If {@code true}, then two
+     * outcomes are possible: either all services will be deployed, or none of them. If {@code false}, then partial
+     * deployments are permitted.
+     * @return a Future representing pending completion of the operation.
+     * @see IgniteServices#deploy(ServiceConfiguration)
+     * @see IgniteServices#deployAll(Collection,boolean)
+     */
+    public IgniteFuture<Void> deployAllAsync(Collection<ServiceConfiguration> cfgs, boolean allOrNone);
 
     /**
      * Cancels service deployment. If a service with specified name was deployed on the grid,
@@ -468,9 +507,33 @@ public interface IgniteServices extends IgniteAsyncSupport {
      *
      * @param name Name of service to cancel.
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to cancel service.
      */
-    public IgniteFuture<Void> cancelAsync(String name) throws IgniteException;
+    public IgniteFuture<Void> cancelAsync(String name);
+
+    /**
+     * Cancels services with specified names.
+     * <p>
+     * Note that depending on user logic, it may still take extra time for a service to
+     * finish execution, even after it was cancelled.
+     * <p>
+     * Supports asynchronous execution (see {@link IgniteAsyncSupport}).
+     *
+     * @param names Names of services to cancel.
+     * @throws IgniteException If failed to cancel services.
+     */
+    @IgniteAsyncSupported
+    public void cancelAll(Collection<String> names) throws IgniteException;
+
+    /**
+     * Asynchronously cancels services with specified names.
+     * <p>
+     * Note that depending on user logic, it may still take extra time for a service to
+     * finish execution, even after it was cancelled.
+     *
+     * @param names Names of services to cancel.
+     * @return a Future representing pending completion of the operation.
+     */
+    public IgniteFuture<Void> cancelAllAsync(Collection<String> names);
 
     /**
      * Cancels all deployed services.
@@ -492,9 +555,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * finish execution, even after it was cancelled.
      *
      * @return a Future representing pending completion of the operation.
-     * @throws IgniteException If failed to cancel services.
      */
-    public IgniteFuture<Void> cancelAllAsync() throws IgniteException;
+    public IgniteFuture<Void> cancelAllAsync();
 
     /**
      * Gets metadata about all deployed services in the grid.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index 607dccc..ad455d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
 import java.util.Collection;
+import java.util.Collections;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
@@ -94,7 +95,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException {
+    @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
 
@@ -127,7 +128,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException {
+    @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
 
@@ -160,8 +161,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt,
-        int maxPerNodeCnt) throws IgniteException {
+    @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
 
@@ -198,7 +198,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc,
-        @Nullable String cacheName, Object affKey) throws IgniteException {
+        @Nullable String cacheName, Object affKey) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
         A.notNull(affKey, "affKey");
@@ -218,10 +218,24 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     @Override public void deploy(ServiceConfiguration cfg) {
         A.notNull(cfg, "cfg");
 
+        deployAll(Collections.singleton(cfg), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) {
+        A.notNull(cfg, "cfg");
+
+        return deployAllAsync(Collections.singleton(cfg), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) {
+        A.notNull(cfgs, "cfgs");
+
         guard();
 
         try {
-            saveOrGet(ctx.service().deploy(cfg));
+            saveOrGet(ctx.service().deployAll(cfgs, allOrNone));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -232,13 +246,14 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException {
-        A.notNull(cfg, "cfg");
+    @Override public IgniteFuture<Void> deployAllAsync(Collection<ServiceConfiguration> cfgs,
+        boolean allOrNone) {
+        A.notNull(cfgs, "cfgs");
 
         guard();
 
         try {
-            return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deploy(cfg));
+            return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(cfgs, allOrNone));
         }
         finally {
             unguard();
@@ -263,7 +278,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> cancelAsync(String name) throws IgniteException {
+    @Override public IgniteFuture<Void> cancelAsync(String name) {
         A.notNull(name, "name");
 
         guard();
@@ -277,6 +292,33 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public void cancelAll(Collection<String> names) {
+        guard();
+
+        try {
+            saveOrGet(ctx.service().cancelAll(names));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> cancelAllAsync(Collection<String> names) {
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancelAll(names));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void cancelAll() {
         guard();
 
@@ -292,7 +334,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Void> cancelAllAsync() throws IgniteException {
+    @Override public IgniteFuture<Void> cancelAllAsync() {
         guard();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 74f7fa6..56af9bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,7 +30,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,9 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.zip.CRC32;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
@@ -897,8 +894,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         checkAttributes(discoCache().remoteNodes());
 
-        ctx.service().initCompatibilityMode(discoCache().remoteNodes());
-
         // Start discovery worker.
         new IgniteThread(discoWrk).start();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java
new file mode 100644
index 0000000..12b88e5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service deployment compound future, {@code allOrNone} parameter specifies failing policy.
+ * <p>
+ * If {@code allOrNone} parameter is set to {@code false}, then this future waits for completion of all child futures.
+ * If any exceptions are thrown during deployment, then {@link IgniteCheckedException} with {@link
+ * ServiceDeploymentException} as a cause will be thrown from {@link IgniteInternalFuture#get get()} method after all
+ * futures complete or fail. Inner exception will contain configurations of failed services.
+ */
+public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Object, Object> {
+    /** */
+    private final boolean allOrNone;
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Names of services written to cache during current deployment. */
+    private Collection<String> svcsToRollback;
+
+    /** */
+    private volatile ServiceDeploymentException err;
+
+    /**
+     * @param allOrNone Failing policy.
+     * @param ctx Kernal context.
+     */
+    GridServiceDeploymentCompoundFuture(boolean allOrNone, GridKernalContext ctx) {
+        this.allOrNone = allOrNone;
+        this.ctx = ctx;
+        this.log = ctx.log(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Object> fut) {
+        assert fut instanceof GridServiceDeploymentFuture : fut;
+
+        GridServiceDeploymentFuture depFut = (GridServiceDeploymentFuture)fut;
+
+        if (allOrNone) {
+            if (initialized()) {
+                onDone(new IgniteCheckedException(
+                    new ServiceDeploymentException("Failed to deploy provided services.", err, getConfigurations())));
+            }
+            else {
+                synchronized (this) {
+                    if (this.err == null) {
+                        this.err = new ServiceDeploymentException("Failed to deploy provided services.", err,
+                            new ArrayList<ServiceConfiguration>());
+                    }
+                    else
+                        this.err.addSuppressed(err);
+                }
+            }
+        }
+        else {
+            synchronized (this) {
+                if (this.err == null)
+                    this.err = new ServiceDeploymentException("Failed to deploy some services.",
+                        new ArrayList<ServiceConfiguration>());
+
+                this.err.getFailedConfigurations().add(depFut.configuration());
+                this.err.addSuppressed(err);
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Marks this future as initialized. Will complete with error if failures before initialization occurred and
+     * all-or-none policy is followed.
+     */
+    public void serviceDeploymentMarkInitialized() {
+        if (allOrNone && this.err != null) {
+            this.err.getFailedConfigurations().addAll(getConfigurations());
+
+            onDone(new IgniteCheckedException(this.err));
+        }
+        else
+            super.markInitialized();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onDone(@Nullable final Object res, @Nullable Throwable err, final boolean cancel) {
+        final Throwable resErr;
+
+        if (err == null && this.err != null)
+            resErr = new IgniteCheckedException(this.err);
+        else
+            resErr = err;
+
+        if (allOrNone && this.err != null && svcsToRollback != null) {
+            U.warn(log, "Failed to deploy provided services. The following services will be cancelled:" + svcsToRollback);
+
+            IgniteInternalFuture<?> fut = ctx.service().cancelAll(svcsToRollback);
+
+            /*
+            Can not call fut.get() since it is possible we are in system pool now and
+            fut also should be completed from system pool.
+             */
+            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to cancel deployed services.", e);
+                    }
+                    finally {
+                        svcsToRollback = null;
+                    }
+
+                    GridServiceDeploymentCompoundFuture.super.onDone(res, resErr, cancel);
+                }
+            });
+
+            return false;
+        }
+
+        return super.onDone(res, resErr, cancel);
+    }
+
+    /**
+     * @param fut Child future.
+     * @param own If {@code true}, then corresponding service will be cancelled on failure.
+     */
+    public void add(GridServiceDeploymentFuture fut, boolean own) {
+        super.add(fut);
+
+        if (own) {
+            if (svcsToRollback == null)
+                svcsToRollback = new ArrayList<>();
+
+            svcsToRollback.add(fut.configuration().getName());
+        }
+    }
+
+    /**
+     * @return Collection of names of services that were written to cache during current deployment.
+     */
+    public Collection<String> servicesToRollback() {
+        if (svcsToRollback != null)
+            return svcsToRollback;
+        else
+            return Collections.emptyList();
+    }
+
+    /**
+     * @return Collection of configurations, stored in child futures.
+     */
+    private Collection<ServiceConfiguration> getConfigurations() {
+        Collection<IgniteInternalFuture<Object>> futs = futures();
+
+        List<ServiceConfiguration> cfgs = new ArrayList<>(futs.size());
+
+        for (IgniteInternalFuture<Object> fut : futs)
+            cfgs.add(((GridServiceDeploymentFuture)fut).configuration());
+
+        return cfgs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 1d8720c..3adad23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.service;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,7 +34,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.security.SecurityException;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
@@ -98,6 +100,7 @@ import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
 import org.apache.ignite.thread.IgniteThreadFactory;
+import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -108,6 +111,7 @@ import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
@@ -129,9 +133,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT
     };
 
-    /** */
-    private final AtomicReference<ServicesCompatibilityState> compatibilityState;
-
     /** Local service instances. */
     private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>();
 
@@ -173,9 +174,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         String servicesCompatibilityMode = getString(IGNITE_SERVICES_COMPATIBILITY_MODE);
 
         srvcCompatibilitySysProp = servicesCompatibilityMode == null ? null : Boolean.valueOf(servicesCompatibilityMode);
-
-        compatibilityState = new AtomicReference<>(
-            new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false));
     }
 
     /**
@@ -265,19 +263,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration();
 
         if (cfgs != null) {
-            Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
             for (ServiceConfiguration c : cfgs) {
                 // Deploy only on server nodes by default.
                 if (c.getNodeFilter() == null)
                     c.setNodeFilter(ctx.cluster().get().forServers().predicate());
-
-                futs.add(deploy(c));
             }
 
-            // Await for services to deploy.
-            for (IgniteInternalFuture<?> f : futs)
-                f.get();
+            deployAll(Arrays.asList(cfgs), true).get();
         }
 
         if (log.isDebugEnabled())
@@ -501,183 +493,288 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     }
 
     /**
-     * @param cfg Service configuration.
-     * @return Future for deployment.
+     * @param cfgs Service configurations.
+     * @param allOrNone Failure processing policy.
+     * @return Configurations to deploy.
      */
-    public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) {
-        A.notNull(cfg, "cfg");
-
-        ServicesCompatibilityState state = markCompatibilityStateAsUsed();
+    private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs, boolean allOrNone) {
+        List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
 
-        validate(cfg);
+        Marshaller marsh = ctx.config().getMarshaller();
 
-        ctx.security().authorize(cfg.getName(), SecurityPermission.SERVICE_DEPLOY, null);
+        List<GridServiceDeploymentFuture> failedFuts = null;
 
-        if (!state.srvcCompatibility) {
-            Marshaller marsh = ctx.config().getMarshaller();
-
-            LazyServiceConfiguration cfg0;
+        for (ServiceConfiguration cfg : cfgs) {
+            Exception err = null;
 
             try {
-                byte[] srvcBytes = U.marshal(marsh, cfg.getService());
-
-                cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
+                validate(cfg);
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService()
-                    + ", marsh=" + marsh + "]", e);
+            catch (Exception e) {
+                U.error(log, "Failed to validate service configuration [name=" + cfg.getName() +
+                    ", srvc=" + cfg.getService() + ']', e);
 
-                return new GridFinishedFuture<>(e);
+                err = e;
             }
 
-            cfg = cfg0;
-        }
+            if (err == null) {
+                try {
+                    ctx.security().authorize(cfg.getName(), SecurityPermission.SERVICE_DEPLOY, null);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to authorize service creation [name=" + cfg.getName() +
+                        ", srvc=" + cfg.getService() + ']', e);
 
-        GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
+                    err = e;
+                }
+            }
 
-        GridServiceDeploymentFuture old = depFuts.putIfAbsent(cfg.getName(), fut);
+            if (err == null) {
+                try {
+                    byte[] srvcBytes = U.marshal(marsh, cfg.getService());
 
-        if (old != null) {
-            if (!old.configuration().equalsIgnoreNodeFilter(cfg)) {
-                fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with " +
-                    "different configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']'));
+                    cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes));
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to marshal service with configured marshaller [name=" + cfg.getName() +
+                        ", srvc=" + cfg.getService() + ", marsh=" + marsh + "]", e);
 
-                return fut;
+                    err = e;
+                }
             }
 
-            return old;
-        }
+            if (err != null) {
+                if (allOrNone) {
+                    return new PreparedConfigurations(null,
+                        null,
+                        new IgniteCheckedException(
+                            new ServiceDeploymentException("None of the provided services were deplyed.", err, cfgs)));
+                }
+                else {
+                    if (failedFuts == null)
+                        failedFuts = new ArrayList<>();
 
-        if (ctx.clientDisconnected()) {
-            fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                "Failed to deploy service, client node disconnected."));
+                    GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
+
+                    fut.onDone(err);
 
-            depFuts.remove(cfg.getName(), fut);
+                    failedFuts.add(fut);
+                }
+            }
         }
 
-        while (true) {
-            try {
-                GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName());
+        return new PreparedConfigurations(cfgsCp, failedFuts, null);
+    }
 
-                if (ctx.deploy().enabled())
-                    ctx.cache().context().deploy().ignoreOwnership(true);
+    /**
+     * @param cfgs Service configurations.
+     * @param allOrNone Failure processing policy.
+     * @return Future for deployment.
+     */
+    public IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) {
+        assert cfgs != null;
 
-                try {
-                    GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key,
-                        new GridServiceDeployment(ctx.localNodeId(), cfg));
+        PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, allOrNone);
 
-                    if (dep != null) {
-                        if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) {
-                            // Remove future from local map.
-                            depFuts.remove(cfg.getName(), fut);
+        if (srvCfg.err != null)
+            return new GridFinishedFuture<>(srvCfg.err);
 
-                            fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with " +
-                                "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']'));
-                        }
-                        else {
-                            Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
-                                ServiceAssignmentsPredicate.INSTANCE);
+        List<ServiceConfiguration> cfgsCp = srvCfg.cfgs;
 
-                            while (it.hasNext()) {
-                                Cache.Entry<Object, Object> e = it.next();
+        List<GridServiceDeploymentFuture> failedFuts = srvCfg.failedFuts;
 
-                                GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
+        Collections.sort(cfgsCp, new Comparator<ServiceConfiguration>() {
+            @Override public int compare(ServiceConfiguration cfg1, ServiceConfiguration cfg2) {
+                return cfg1.getName().compareTo(cfg2.getName());
+            }
+        });
 
-                                if (assigns.name().equals(cfg.getName())) {
-                                    // Remove future from local map.
-                                    depFuts.remove(cfg.getName(), fut);
+        GridServiceDeploymentCompoundFuture res;
 
-                                    fut.onDone();
+        while (true) {
+            res = new GridServiceDeploymentCompoundFuture(allOrNone, ctx);
 
-                                    break;
-                                }
+            if (ctx.deploy().enabled())
+                ctx.cache().context().deploy().ignoreOwnership(true);
+
+            try {
+                if (cfgsCp.size() == 1)
+                    writeServiceToCache(res, cfgsCp.get(0));
+                else if (cfgsCp.size() > 1) {
+                    try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+                        for (ServiceConfiguration cfg : cfgsCp) {
+                            try {
+                                writeServiceToCache(res, cfg);
                             }
+                            catch (IgniteCheckedException e) {
+                                if (X.hasCause(e, ClusterTopologyCheckedException.class))
+                                    throw e; // Retry.
+
+                                if (allOrNone) {
+                                    for (String name : res.servicesToRollback())
+                                        depFuts.remove(name).onDone(e);
+
+                                    res.onDone(new IgniteCheckedException(new ServiceDeploymentException(
+                                        "Failed to deploy provided services.", e, cfgs)));
 
-                            if (!dep.configuration().equalsIgnoreNodeFilter(cfg))
-                                U.warn(log, "Service already deployed with different configuration (will ignore) " +
-                                    "[deployed=" + dep.configuration() + ", new=" + cfg + ']');
+                                    return res;
+                                }
+                            }
                         }
+
+                        tx.commit();
                     }
                 }
-                finally {
-                    if (ctx.deploy().enabled())
-                        ctx.cache().context().deploy().ignoreOwnership(false);
-                }
 
-                return fut;
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
+                break;
             }
-            catch (IgniteCheckedException e) {
-                if (e.hasCause(ClusterTopologyCheckedException.class)) {
+            catch (IgniteException | IgniteCheckedException e) {
+                for (String name : res.servicesToRollback())
+                    depFuts.remove(name).onDone(e);
+
+                if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
                     if (log.isDebugEnabled())
-                        log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
+                        log.debug("Topology changed while deploying services (will retry): " + e.getMessage());
+                }
+                else {
+                    res.onDone(new IgniteCheckedException(
+                        new ServiceDeploymentException("Failed to deploy provided services.", e, cfgs)));
 
-                    continue;
+                    return res;
                 }
+            }
+            finally {
+                if (ctx.deploy().enabled())
+                    ctx.cache().context().deploy().ignoreOwnership(false);
+            }
+        }
 
-                U.error(log, "Failed to deploy service: " + cfg.getName(), e);
+        if (ctx.clientDisconnected()) {
+            IgniteClientDisconnectedCheckedException err =
+                new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to deploy services, client node disconnected: " + cfgs);
+
+            for (String name : res.servicesToRollback()) {
+                GridServiceDeploymentFuture fut = depFuts.remove(name);
 
-                return new GridFinishedFuture<>(e);
+                if (fut != null)
+                    fut.onDone(err);
             }
+
+            return new GridFinishedFuture<>(err);
         }
+
+        if (failedFuts != null) {
+            for (GridServiceDeploymentFuture fut : failedFuts)
+                res.add(fut, false);
+        }
+
+        res.serviceDeploymentMarkInitialized();
+
+        return res;
     }
 
     /**
-     * @return Compatibility state.
+     * @param res Resulting compound future.
+     * @param cfg Service configuration.
+     * @throws IgniteCheckedException If operation failed.
      */
-    private ServicesCompatibilityState markCompatibilityStateAsUsed() {
-        while (true) {
-            ServicesCompatibilityState state = compatibilityState.get();
+    private void writeServiceToCache(GridServiceDeploymentCompoundFuture res, ServiceConfiguration cfg)
+        throws IgniteCheckedException {
+        String name = cfg.getName();
 
-            if (state.used)
-                return state;
+        GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
 
-            ServicesCompatibilityState newState = new ServicesCompatibilityState(state.srvcCompatibility, true);
+        GridServiceDeploymentFuture old = depFuts.putIfAbsent(name, fut);
 
-            if (compatibilityState.compareAndSet(state, newState))
-                return newState;
-        }
-    }
+        try {
+            if (old != null) {
+                if (!old.configuration().equalsIgnoreNodeFilter(cfg))
+                    throw new IgniteCheckedException("Failed to deploy service (service already exists with different " +
+                        "configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']');
+                else {
+                    res.add(old, false);
 
-    /**
-     * @param name Service name.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> cancel(String name) {
-        ctx.security().authorize(name, SecurityPermission.SERVICE_CANCEL, null);
+                    return;
+                }
+            }
 
-        while (true) {
-            try {
-                GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+            GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
+
+            GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key,
+                new GridServiceDeployment(ctx.localNodeId(), cfg));
 
-                GridFutureAdapter<?> old;
+            if (dep != null) {
+                if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) {
+                    String err = "Failed to deploy service (service already exists with different " +
+                        "configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']';
 
-                if ((old = undepFuts.putIfAbsent(name, fut)) != null)
-                    fut = old;
+                    U.error(log, err);
+
+                    throw new IgniteCheckedException(err);
+                }
                 else {
-                    GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
+                    res.add(fut, false);
+
+                    Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
+
+                    while (it.hasNext()) {
+                        Cache.Entry<Object, Object> e = it.next();
 
-                    if (cache.getAndRemove(key) == null) {
-                        // Remove future from local map if service was not deployed.
-                        undepFuts.remove(name);
+                        GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
 
-                        fut.onDone();
+                        if (assigns.name().equals(name)) {
+                            fut.onDone();
+
+                            depFuts.remove(name, fut);
+
+                            break;
+                        }
                     }
                 }
-
-                return fut;
             }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
+            else
+                res.add(fut, true);
+        }
+        catch (IgniteCheckedException e) {
+            fut.onDone(e);
+
+            res.add(fut, false);
+
+            depFuts.remove(name, fut);
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param cfg Service configuration.
+     * @return Future for deployment.
+     */
+    public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) {
+        A.notNull(cfg, "cfg");
+
+        return deployAll(Collections.singleton(cfg), false);
+    }
+
+    /**
+     * @param name Service name.
+     * @return Future.
+     */
+    public IgniteInternalFuture<?> cancel(String name) {
+        while (true) {
+            try {
+                return removeServiceFromCache(name).fut;
             }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to undeploy service: " + name, e);
+            catch (IgniteException | IgniteCheckedException e) {
+                if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Topology changed while cancelling service (will retry): " + e.getMessage());
+                } else {
+                    U.error(log, "Failed to undeploy service: " + name, e);
 
-                return new GridFinishedFuture<>(e);
+                    return new GridFinishedFuture<>(e);
+                }
             }
         }
     }
@@ -689,18 +786,73 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     public IgniteInternalFuture<?> cancelAll() {
         Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
 
-        GridCompoundFuture res = null;
+        List<String> svcNames = new ArrayList<>();
 
         while (it.hasNext()) {
-            Cache.Entry<Object, Object> e = it.next();
+            GridServiceDeployment dep = (GridServiceDeployment)it.next().getValue();
 
-            GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
+            svcNames.add(dep.configuration().getName());
+        }
+
+        return cancelAll(svcNames);
+    }
+
+    /**
+     * @param svcNames Name of service to deploy.
+     * @return Future.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<?> cancelAll(Collection<String> svcNames) {
+        List<String> svcNamesCp = new ArrayList<>(svcNames);
 
-            if (res == null)
-                res = new GridCompoundFuture<>();
+        Collections.sort(svcNamesCp);
+
+        GridCompoundFuture res;
+
+        while (true) {
+            res = null;
+
+            List<String> toRollback = new ArrayList<>();
+
+            try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+                for (String name : svcNames) {
+                    if (res == null)
+                        res = new GridCompoundFuture<>();
+
+                    try {
+                        CancelResult cr = removeServiceFromCache(name);
 
-            // Cancel each service separately.
-            res.add(cancel(dep.configuration().getName()));
+                        if (cr.rollback)
+                            toRollback.add(name);
+
+                        res.add(cr.fut);
+                    }
+                    catch (IgniteException | IgniteCheckedException e) {
+                        if (X.hasCause(e, ClusterTopologyCheckedException.class))
+                            throw e; // Retry.
+                        else {
+                            U.error(log, "Failed to undeploy service: " + name, e);
+
+                            res.add(new GridFinishedFuture<>(e));
+                        }
+                    }
+                }
+
+                tx.commit();
+
+                break;
+            }
+            catch (IgniteException | IgniteCheckedException e) {
+                for (String name : toRollback)
+                    undepFuts.remove(name).onDone(e);
+
+                if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Topology changed while cancelling service (will retry): " + e.getMessage());
+                }
+                else
+                    return new GridFinishedFuture<>(e);
+            }
         }
 
         if (res != null) {
@@ -713,6 +865,50 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     }
 
     /**
+     * @param name Name of service to remove from internal cache.
+     * @return Cancellation future and a flag whether it should be completed and removed on error.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private CancelResult removeServiceFromCache(String name) throws IgniteCheckedException {
+        try {
+            ctx.security().authorize(name, SecurityPermission.SERVICE_CANCEL, null);
+        }
+        catch (SecurityException e) {
+            return new CancelResult(new GridFinishedFuture<>(e), false);
+        }
+
+        GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+
+        GridFutureAdapter<?> old = undepFuts.putIfAbsent(name, fut);
+
+        if (old != null)
+            return new CancelResult(old, false);
+        else {
+            GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
+
+            try {
+                if (cache.getAndRemove(key) == null) {
+                    // Remove future from local map if service was not deployed.
+                    undepFuts.remove(name, fut);
+
+                    fut.onDone();
+
+                    return new CancelResult(fut, false);
+                }
+                else
+                    return new CancelResult(fut, true);
+            }
+            catch (IgniteCheckedException e) {
+                undepFuts.remove(name, fut);
+
+                fut.onDone(e);
+
+                throw e;
+            }
+        }
+    }
+
+    /**
      * @param name Service name.
      * @param timeout If greater than 0 limits task execution time. Cannot be negative.
      * @return Service topology.
@@ -1323,23 +1519,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     }
 
     /**
-     * @param nodes Remote nodes.
-     */
-    public void initCompatibilityMode(Collection<ClusterNode> nodes) {
-        boolean mode = false;
-
-        if (srvcCompatibilitySysProp != null)
-            mode = srvcCompatibilitySysProp;
-
-        while (true) {
-            ServicesCompatibilityState state = compatibilityState.get();
-
-            if (compatibilityState.compareAndSet(state, new ServicesCompatibilityState(mode, state.used)))
-                return;
-        }
-    }
-
-    /**
      * Called right after utility cache is started and ready for the usage.
      */
     public void onUtilityCacheStarted() {
@@ -1365,7 +1544,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
             GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
 
-            if (busyLock ==  null || !busyLock.enterBusy())
+            if (busyLock == null || !busyLock.enterBusy())
                 return;
 
             try {
@@ -1385,27 +1564,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @param evts Update events.
      */
     private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
-        boolean firstTime = true;
-
         for (CacheEntryEvent<?, ?> e : evts) {
-            if (e.getKey() instanceof GridServiceDeploymentKey) {
-                if (firstTime) {
-                    markCompatibilityStateAsUsed();
-
-                    firstTime = false;
-                }
-
+            if (e.getKey() instanceof GridServiceDeploymentKey)
                 processDeployment((CacheEntryEvent)e);
-            }
-            else if (e.getKey() instanceof GridServiceAssignmentsKey) {
-                if (firstTime) {
-                    markCompatibilityStateAsUsed();
-
-                    firstTime = false;
-                }
-
+            else if (e.getKey() instanceof GridServiceAssignmentsKey)
                 processAssignment((CacheEntryEvent)e);
-            }
         }
     }
 
@@ -1578,8 +1741,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                                 Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
                                     ServiceDeploymentPredicate.INSTANCE);
 
-                                boolean firstTime = true;
-
                                 while (it.hasNext()) {
                                     // If topology changed again, let next event handle it.
                                     AffinityTopologyVersion currTopVer0 = currTopVer;
@@ -1596,12 +1757,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
                                     Cache.Entry<Object, Object> e = it.next();
 
-                                    if (firstTime) {
-                                        markCompatibilityStateAsUsed();
-
-                                        firstTime = false;
-                                    }
-
                                     GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
 
                                     try {
@@ -1789,6 +1944,26 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     /**
      *
      */
+    private static class CancelResult {
+        /** */
+        IgniteInternalFuture<?> fut;
+
+        /** */
+        boolean rollback;
+
+        /**
+         * @param fut Future.
+         * @param rollback {@code True} if service was cancelled during current call.
+         */
+        CancelResult(IgniteInternalFuture<?> fut, boolean rollback) {
+            this.fut = fut;
+            this.rollback = rollback;
+        }
+    }
+
+    /**
+     *
+     */
     private abstract class DepRunnable implements Runnable {
         /** {@inheritDoc} */
         @Override public void run() {
@@ -1935,24 +2110,4 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             return serviceTopology(cache, svcName);
         }
     }
-
-    /**
-     *
-     */
-    private static class ServicesCompatibilityState {
-        /** */
-        private final boolean srvcCompatibility;
-
-        /** */
-        private final boolean used;
-
-        /**
-         * @param srvcCompatibility Services compatibility mode ({@code true} if compatible with old nodes).
-         * @param used Services has been used.
-         */
-        ServicesCompatibilityState(boolean srvcCompatibility, boolean used) {
-            this.srvcCompatibility = srvcCompatibility;
-            this.used = used;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java
new file mode 100644
index 0000000..a581e15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.services.ServiceConfiguration;
+
+/**
+ * Result of services validation before deployment.
+ */
+class PreparedConfigurations {
+    /** */
+    final List<ServiceConfiguration> cfgs;
+
+    /** */
+    final List<GridServiceDeploymentFuture> failedFuts;
+
+    /** */
+    final Exception err;
+
+    /**
+     * @param cfgs Configurations to deploy.
+     * @param failedFuts Finished futures for failed configurations.
+     * @param err Error if need to stop deploy.
+     */
+    PreparedConfigurations(List<ServiceConfiguration> cfgs, List<GridServiceDeploymentFuture> failedFuts,
+        Exception err) {
+        this.cfgs = cfgs;
+        this.failedFuts = failedFuts;
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PreparedConfigurations.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java
new file mode 100644
index 0000000..32fbf6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception indicating service deployment failure.
+ */
+public class ServiceDeploymentException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Collection<ServiceConfiguration> cfgs;
+
+    /**
+     * Creates service deployment exception with error message.
+     *
+     * @param msg Error message.
+     * @param cfgs Configurations of services that failed to deploy.
+     */
+    public ServiceDeploymentException(String msg, Collection<ServiceConfiguration> cfgs) {
+        super(msg);
+
+        this.cfgs = cfgs;
+    }
+
+    /**
+     * Creates service deployment exception with {@link Throwable} as a cause.
+     *
+     * @param cause Cause.
+     * @param cfgs Configurations of services that failed to deploy.
+     */
+    public ServiceDeploymentException(Throwable cause, Collection<ServiceConfiguration> cfgs) {
+        super(cause);
+
+        this.cfgs = cfgs;
+    }
+
+    /**
+     * Creates service deployment exception with error message and {@link Throwable} as a cause.
+     *
+     * @param msg Error message.
+     * @param cause Cause.
+     * @param cfgs Configurations of services that failed to deploy.
+     */
+    public ServiceDeploymentException(String msg, @Nullable Throwable cause, Collection<ServiceConfiguration> cfgs) {
+        super(msg, cause);
+
+        this.cfgs = cfgs;
+    }
+
+    /**
+     * @return Configurations of services that failed to deploy.
+     */
+    public Collection<ServiceConfiguration> getFailedConfigurations() {
+        return cfgs;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 80cf67b..a724060 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -112,11 +112,11 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
         }
         catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
             ClusterTopologyCheckedException e) {
-            if (!ignoreFailure(e))
+            if (!processFailure(e, fut))
                 onDone(e);
         }
         catch (IgniteCheckedException e) {
-            if (!ignoreFailure(e)) {
+            if (!processFailure(e, fut)) {
                 if (e instanceof NodeStoppingException)
                     logDebug(logger(), "Failed to execute compound future reducer, node stopped.");
                 else
@@ -183,6 +183,17 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     }
 
     /**
+     * Processes error thrown by some of the inner futures.
+     *
+     * @param err Thrown exception.
+     * @param fut Failed future.
+     * @return {@code True} if this error should be ignored.
+     */
+    protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) {
+        return ignoreFailure(err);
+    }
+
+    /**
      * Checks if there are pending futures. This is not the same as
      * {@link #isDone()} because child classes may override {@link #onDone(Object, Throwable)}
      * call and delay completion.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java
new file mode 100644
index 0000000..51c3407
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/** */
+public class GridServiceDeploymentCompoundFutureSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static GridKernalContext ctx;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteKernal kernal = (IgniteKernal)startGrid(0);
+
+        ctx = kernal.context();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWaitForCompletionOnFailingFuturePartial() throws Exception {
+        GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(false, ctx);
+
+        int failingFutsNum = 2;
+
+        int completingFutsNum = 5;
+
+        Collection<GridServiceDeploymentFuture> failingFuts = new ArrayList<>(completingFutsNum);
+
+        for (int i = 0; i < failingFutsNum; i++) {
+            ServiceConfiguration failingCfg = config("Failed-" + i);
+
+            GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg);
+
+            failingFuts.add(failingFut);
+
+            compFut.add(failingFut);
+        }
+
+        List<GridFutureAdapter<Object>> futs = new ArrayList<>(completingFutsNum);
+
+        for (int i = 0; i < completingFutsNum; i++) {
+            GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(config(String.valueOf(i)));
+
+            futs.add(fut);
+
+            compFut.add(fut);
+        }
+
+        compFut.serviceDeploymentMarkInitialized();
+
+        List<Exception> causes = new ArrayList<>();
+
+        for (GridServiceDeploymentFuture fut : failingFuts) {
+            Exception cause = new Exception("Test error");
+
+            causes.add(cause);
+
+            fut.onDone(cause);
+        }
+
+        try {
+            compFut.get(100);
+
+            fail("Should never reach here.");
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            log.info("Expected exception: " + e.getMessage());
+        }
+
+        for (GridFutureAdapter<Object> fut : futs)
+            fut.onDone();
+
+        try {
+            compFut.get();
+
+            fail("Should never reach here.");
+        }
+        catch (IgniteCheckedException ce) {
+            log.info("Expected exception: " + ce.getMessage());
+
+            IgniteException e = U.convertException(ce);
+
+            assertTrue(e instanceof ServiceDeploymentException);
+
+            Throwable[] supErrs = e.getSuppressed();
+
+            assertEquals(failingFutsNum, supErrs.length);
+
+            for (int i = 0; i < failingFutsNum; i++)
+                assertEquals(causes.get(i), supErrs[i].getCause());
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailAllAfterInitialized() throws Exception {
+        GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(true, ctx);
+
+        ServiceConfiguration failingCfg = config("Failed");
+
+        GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg);
+
+        compFut.add(failingFut);
+
+        int futsNum = 5;
+
+        List<ServiceConfiguration> cfgs = new ArrayList<>(futsNum + 1);
+
+        cfgs.add(failingCfg);
+
+        for (int i = 0; i < futsNum; i++) {
+            ServiceConfiguration cfg = config(String.valueOf(i));
+
+            cfgs.add(cfg);
+
+            compFut.add(new GridServiceDeploymentFuture(cfg));
+        }
+
+        compFut.serviceDeploymentMarkInitialized();
+
+        Exception expCause = new Exception("Test error");
+
+        failingFut.onDone(expCause);
+
+        assertFailAll(compFut, cfgs, expCause);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailAllBeforeInitialized() throws Exception {
+        GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(true, ctx);
+
+        ServiceConfiguration failingCfg = config("Failed");
+
+        GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg);
+
+        Exception expCause = new Exception("Test error");
+
+        failingFut.onDone(expCause);
+
+        compFut.add(failingFut);
+
+        assertFalse(compFut.isDone());
+
+        int futsNum = 5;
+
+        List<ServiceConfiguration> cfgs = new ArrayList<>(futsNum + 1);
+
+        cfgs.add(failingCfg);
+
+        for (int i = 0; i < futsNum; i++) {
+            ServiceConfiguration cfg = config(String.valueOf(i));
+
+            cfgs.add(cfg);
+
+            compFut.add(new GridServiceDeploymentFuture(cfg));
+        }
+
+        compFut.serviceDeploymentMarkInitialized();
+
+        assertFailAll(compFut, cfgs, expCause);
+    }
+
+    /**
+     * Try waiting for the future completion and check that a proper exception is thrown.
+     *
+     * @param fut Future.
+     * @param expCfgs Expected cfgs.
+     * @param expCause Expected cause.
+     */
+    private void assertFailAll(GridServiceDeploymentCompoundFuture fut, Collection<ServiceConfiguration> expCfgs,
+        Exception expCause) {
+        try {
+            fut.get();
+
+            fail("Should never reach here.");
+        }
+        catch (IgniteCheckedException ce) {
+            log.info("Expected exception: " + ce.getMessage());
+
+            IgniteException e = U.convertException(ce);
+
+            assertTrue(e instanceof ServiceDeploymentException);
+
+            assertEqualsCollections(expCfgs, ((ServiceDeploymentException)e).getFailedConfigurations());
+
+            Throwable actCause = e.getCause();
+
+            assertTrue(actCause instanceof IgniteCheckedException);
+
+            assertEquals(expCause, actCause.getCause());
+        }
+    }
+
+    /**
+     * @param name Name.
+     * @return Dummy configuration with a specified name.
+     */
+    private ServiceConfiguration config(String name) {
+        ServiceConfiguration cfg = new ServiceConfiguration();
+
+        cfg.setName(name);
+
+        return cfg;
+    }
+}


Mime
View raw message