ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/26] ignite git commit: IGNITE-4699: Added custom executors for compute tasls. This closes #1718.
Date Mon, 24 Apr 2017 09:11:32 GMT
IGNITE-4699: Added custom executors for compute tasls. This closes #1718.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f871b0d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f871b0d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f871b0d7

Branch: refs/heads/ignite-1794
Commit: f871b0d77084f4ebf7993eccc9cf59767835a41d
Parents: 3eb52a8
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Fri Apr 21 14:40:22 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Apr 21 14:40:22 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCompute.java   |  14 ++
 .../configuration/ExecutorConfiguration.java    | 115 +++++++++
 .../configuration/IgniteConfiguration.java      |  30 +++
 .../ignite/internal/ExecutorAwareMessage.java   |  31 +++
 .../ignite/internal/GridJobExecuteRequest.java  |  32 ++-
 .../ignite/internal/GridKernalContext.java      |   8 +
 .../ignite/internal/GridKernalContextImpl.java  |  12 +
 .../ignite/internal/GridTaskSessionImpl.java    |  15 +-
 .../ignite/internal/IgniteComputeImpl.java      |  71 ++++--
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  66 +++++
 .../managers/communication/GridIoManager.java   |  23 +-
 .../managers/communication/GridIoMessage.java   |  13 +
 .../closure/GridClosureProcessor.java           | 154 +++++++-----
 .../processors/job/GridJobProcessor.java        |  23 +-
 .../internal/processors/job/GridJobWorker.java  |  15 +-
 .../internal/processors/pool/PoolProcessor.java |  25 ++
 .../session/GridTaskSessionProcessor.java       |  10 +-
 .../processors/task/GridTaskProcessor.java      |  69 +++++-
 .../processors/task/GridTaskWorker.java         |   3 +-
 ...puteCustomExecutorConfigurationSelfTest.java |  85 +++++++
 .../IgniteComputeCustomExecutorSelfTest.java    | 245 +++++++++++++++++++
 .../junits/GridTestKernalContext.java           |   5 +-
 .../testsuites/IgniteComputeGridTestSuite.java  |   5 +
 24 files changed, 970 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index ad675c0..f0e6039 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -24,6 +24,8 @@ import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.compute.ComputeTaskName;
 import org.apache.ignite.compute.ComputeTaskSpis;
+import org.apache.ignite.configuration.ExecutorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteCallable;
@@ -751,4 +753,16 @@ public interface IgniteCompute extends IgniteAsyncSupport {
     /** {@inheritDoc} */
     @Deprecated
     @Override public IgniteCompute withAsync();
+
+    /**
+     * Gets instance of the compute API associated with custom executor. All tasks and closures submitted to returned
+     * instance will be processed by this executor on both remote and local nodes. If executor with the given name
+     * doesn't exist, task will be processed in default ("public") pool.
+     * <p>
+     * Executor should be defined in {@link IgniteConfiguration#setExecutorConfiguration(ExecutorConfiguration...)}.
+     *
+     * @param name Custom executor name.
+     * @return Instance of compute API associated with custom executor.
+     */
+    public IgniteCompute withExecutor(@NotNull String name);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
new file mode 100644
index 0000000..8ff7932
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
+
+/**
+ * –°ustom thread pool configuration for compute tasks. See {@link IgniteCompute#withAsync()} for more information.
+ */
+public class ExecutorConfiguration {
+    /** Thread pool name. */
+    private String name;
+
+    /** Thread pool size. */
+    private int size = DFLT_PUBLIC_THREAD_CNT;
+
+    /**
+     * Default constructor.
+     */
+    public ExecutorConfiguration() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Thread pool name.
+     */
+    public ExecutorConfiguration(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Copying constructor.
+     *
+     * @param other Instance to copy.
+     */
+    public ExecutorConfiguration(ExecutorConfiguration other) {
+        assert other != null;
+
+        name = other.name;
+        size = other.size;
+    }
+
+    /**
+     * Get thread pool name.
+     * <p>
+     * See {@link #setName(String)} for more information.
+     *
+     * @return Executor name.
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Set thread pool name. Name cannot be {@code null} and should be unique with respect to other custom executors.
+     *
+     * @param name Executor name.
+     * @return {@code this} for chaining.
+     */
+    public ExecutorConfiguration setName(String name) {
+        this.name = name;
+
+        return this;
+    }
+
+    /**
+     * Get thread pool size.
+     * <p>
+     * See {@link #setSize(int)} for more information.
+     *
+     * @return Thread pool size.
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * Set thread pool size.
+     * <p>
+     * Defaults to {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT}.
+     *
+     * @param size Thread pool size.
+     * @return {@code this} for chaining.
+     */
+    public ExecutorConfiguration setSize(int size) {
+        this.size = size;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExecutorConfiguration.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index fe08ddf..17927b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoader;
 import javax.cache.processor.EntryProcessor;
 import javax.management.MBeanServer;
 import javax.net.ssl.SSLContext;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.Ignition;
@@ -437,6 +438,9 @@ public class IgniteConfiguration {
     /** */
     private BinaryConfiguration binaryCfg;
 
+    /** Custom executor configurations. */
+    private ExecutorConfiguration[] execCfgs;
+
     /** */
     private boolean lateAffAssignment = DFLT_LATE_AFF_ASSIGNMENT;
 
@@ -494,6 +498,7 @@ public class IgniteConfiguration {
         dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
+        execCfgs = cfg.getExecutorConfiguration();
         failureDetectionTimeout = cfg.getFailureDetectionTimeout();
         hadoopCfg = cfg.getHadoopConfiguration();
         igfsCfg = cfg.getFileSystemConfiguration();
@@ -2658,6 +2663,31 @@ public class IgniteConfiguration {
         return this;
     }
 
+    /**
+     * Gets custom executors for user compute tasks.
+     * <p>
+     * See {@link #setExecutorConfiguration(ExecutorConfiguration...)} for more information.
+     *
+     * @return Executor configurations.
+     */
+    public ExecutorConfiguration[] getExecutorConfiguration() {
+        return execCfgs;
+    }
+
+    /**
+     * Sets custom executors for user compute tasks.
+     * <p>
+     * See {@link IgniteCompute#withExecutor(String)} for more information.
+     *
+     * @param execCfgs Executor configurations.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setExecutorConfiguration(ExecutorConfiguration... execCfgs) {
+        this.execCfgs = execCfgs;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
new file mode 100644
index 0000000..a8a3b1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Message with specified custom executor must be processed in the appropriate thread pool.
+ */
+public interface ExecutorAwareMessage extends Message {
+    /**
+     * @return Custom executor name. {@code null} In case the custom executor is not provided.
+     */
+    @Nullable public String executorName();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index a7e8309..fe2d6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Job execution request.
  */
-public class GridJobExecuteRequest implements Message {
+public class GridJobExecuteRequest implements ExecutorAwareMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -146,6 +145,9 @@ public class GridJobExecuteRequest implements Message {
     /** */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private String execName;
+
     /**
      * No-op constructor to support {@link Externalizable} interface.
      */
@@ -182,6 +184,7 @@ public class GridJobExecuteRequest implements Message {
      * @param cacheIds Caches' identifiers to reserve partition.
      * @param part Partition to lock.
      * @param topVer Affinity topology version of job mapping.
+     * @param execName The name of the custom named executor.
      */
     public GridJobExecuteRequest(
             IgniteUuid sesId,
@@ -211,7 +214,8 @@ public class GridJobExecuteRequest implements Message {
             UUID subjId,
             @Nullable int[] cacheIds,
             int part,
-            @Nullable AffinityTopologyVersion topVer) {
+            @Nullable AffinityTopologyVersion topVer,
+            @Nullable String execName) {
         this.top = top;
         assert sesId != null;
         assert jobId != null;
@@ -251,6 +255,7 @@ public class GridJobExecuteRequest implements Message {
         this.idsOfCaches = cacheIds;
         this.part = part;
         this.topVer = topVer;
+        this.execName = execName;
 
         this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
     }
@@ -454,6 +459,11 @@ public class GridJobExecuteRequest implements Message {
         return part;
     }
 
+    /** {@inheritDoc} */
+    @Override public String executorName() {
+        return execName;
+    }
+
     /**
      * @return Affinity version which was used to map job
      */
@@ -622,6 +632,12 @@ public class GridJobExecuteRequest implements Message {
 
                 writer.incrementState();
 
+            case 24:
+                if (!writer.writeString("executorName", execName))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -831,6 +847,14 @@ public class GridJobExecuteRequest implements Message {
 
                 reader.incrementState();
 
+            case 24:
+                execName = reader.readString("executorName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridJobExecuteRequest.class);
@@ -843,7 +867,7 @@ public class GridJobExecuteRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 8462e5f..010bd21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -563,6 +563,14 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      */
     public ExecutorService getQueryExecutorService();
 
+
+    /**
+     * Executor services that is in charge of processing user compute task.
+     *
+     * @return Map of custom thread pool executors.
+     */
+    @Nullable public Map<String, ? extends ExecutorService> customExecutors();
+
     /**
      * Executor service that is in charge of processing schema change messages.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 213cf86..bbc9846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -344,6 +344,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    Map<String, ? extends ExecutorService> customExecSvcs;
+
+    /** */
+    @GridToStringExclude
     private Map<String, Object> attrs = new HashMap<>();
 
     /** */
@@ -401,6 +405,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      * @param callbackExecSvc Callback executor service.
      * @param qryExecSvc Query executor service.
      * @param schemaExecSvc Schema executor service.
+     * @param customExecSvcs Custom named executors.
      * @param plugins Plugin providers.
      */
     @SuppressWarnings("TypeMayBeWeakened")
@@ -424,6 +429,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         ExecutorService qryExecSvc,
         ExecutorService schemaExecSvc,
+        @Nullable Map<String, ? extends ExecutorService> customExecSvcs,
         List<PluginProvider> plugins
     ) {
         assert grid != null;
@@ -448,6 +454,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.callbackExecSvc = callbackExecSvc;
         this.qryExecSvc = qryExecSvc;
         this.schemaExecSvc = schemaExecSvc;
+        this.customExecSvcs = customExecSvcs;
 
         marshCtx = new MarshallerContextImpl(plugins);
 
@@ -998,6 +1005,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    public Map<String, ? extends ExecutorService> customExecutors() {
+        return customExecSvcs;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteExceptionRegistry exceptionRegistry() {
         return IgniteExceptionRegistry.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index dd1caa1..458ad36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -114,6 +114,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     /** */
     private final IgniteFutureImpl mapFut;
 
+    /** */
+    private final String execName;
+
     /**
      * @param taskNodeId Task node ID.
      * @param taskName Task name.
@@ -129,6 +132,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
      * @param fullSup Session full support enabled flag.
      * @param internal Internal task flag.
      * @param subjId Subject ID.
+     * @param execName Custom executor name.
      */
     public GridTaskSessionImpl(
         UUID taskNodeId,
@@ -144,7 +148,8 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         GridKernalContext ctx,
         boolean fullSup,
         boolean internal,
-        UUID subjId) {
+        UUID subjId,
+        @Nullable String execName) {
         assert taskNodeId != null;
         assert taskName != null;
         assert sesId != null;
@@ -173,6 +178,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         this.fullSup = fullSup;
         this.internal = internal;
         this.subjId = subjId;
+        this.execName = execName;
 
         mapFut = new IgniteFutureImpl(new GridFutureAdapter());
     }
@@ -873,6 +879,13 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         return internal;
     }
 
+    /**
+     * @return Custom executor name.
+     */
+    @Nullable public String executorName() {
+        return execName;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridTaskSessionImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 7499a5d..7ddd4ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -73,6 +73,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
     /** */
     private UUID subjId;
 
+    /** Custom executor name. */
+    private String execName;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -103,6 +106,25 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         this.subjId = subjId;
     }
 
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     * @param prj Projection.
+     * @param subjId Subject ID.
+     * @param async Async support flag.
+     * @param execName Custom executor name.
+     */
+    private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async,
+        String execName) {
+        super(async);
+
+        this.ctx = ctx;
+        this.prj = prj;
+        this.subjId = subjId;
+        this.execName = execName;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteCompute createAsyncInstance() {
         return new IgniteComputeImpl(ctx, prj, subjId, true);
@@ -152,7 +174,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes());
+            return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -205,7 +227,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes());
+            return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -248,7 +270,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes());
+            return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -298,7 +320,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes());
+            return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -351,7 +373,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes());
+            return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -394,7 +416,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes());
+            return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -437,7 +459,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return ctx.task().execute(taskName, arg);
+            return ctx.task().execute(taskName, arg, execName);
         }
         finally {
             unguard();
@@ -477,7 +499,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return ctx.task().execute(taskCls, arg);
+            return ctx.task().execute(taskCls, arg, execName);
         }
         finally {
             unguard();
@@ -516,7 +538,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return ctx.task().execute(task, arg);
+            return ctx.task().execute(task, arg, execName);
         }
         finally {
             unguard();
@@ -550,7 +572,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().runAsync(BROADCAST, job, prj.nodes());
+            return ctx.closure().runAsync(BROADCAST, job, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -584,7 +606,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes());
+            return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -620,7 +642,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().broadcast(job, arg, prj.nodes());
+            return ctx.closure().broadcast(job, arg, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -654,7 +676,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().runAsync(BALANCE, job, prj.nodes());
+            return ctx.closure().runAsync(BALANCE, job, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -689,7 +711,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().runAsync(BALANCE, jobs, prj.nodes());
+            return ctx.closure().runAsync(BALANCE, jobs, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -725,7 +747,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(job, arg, prj.nodes());
+            return ctx.closure().callAsync(job, arg, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -759,7 +781,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(BALANCE, job, prj.nodes());
+            return ctx.closure().callAsync(BALANCE, job, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -794,7 +816,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes());
+            return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -832,7 +854,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(job, args, prj.nodes());
+            return ctx.closure().callAsync(job, args, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -870,7 +892,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes());
+            return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -911,7 +933,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return ctx.closure().callAsync(job, args, rdc, prj.nodes());
+            return ctx.closure().callAsync(job, args, rdc, prj.nodes(), execName);
         }
         finally {
             unguard();
@@ -1040,11 +1062,13 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(prj);
+        out.writeObject(execName);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         prj = (ClusterGroupAdapter)in.readObject();
+        execName = (String)in.readObject();
     }
 
     /**
@@ -1054,7 +1078,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
      * @throws ObjectStreamException Thrown in case of unmarshalling error.
      */
     protected Object readResolve() throws ObjectStreamException {
-        return prj.compute();
+        return prj.compute().withExecutor(execName);
     }
 
     /** {@inheritDoc} */
@@ -1068,4 +1092,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
     @Override public <R> ComputeTaskFuture<R> future() {
         return (ComputeTaskFuture<R>)super.future();
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompute withExecutor(@NotNull String name) {
+        return new IgniteComputeImpl(ctx, prj, subjId, isAsync(), name);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 50f39fa..12a7af6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -699,6 +699,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @param callbackExecSvc Callback executor service.
      * @param qryExecSvc Query executor service.
      * @param schemaExecSvc Schema executor service.
+     * @param customExecSvcs Custom named executors.
      * @param errHnd Error handler to use for notification about startup problems.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
@@ -720,6 +721,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         ExecutorService qryExecSvc,
         ExecutorService schemaExecSvc,
+        Map<String, ? extends ExecutorService> customExecSvcs,
         GridAbsClosure errHnd
     )
         throws IgniteCheckedException
@@ -835,6 +837,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 callbackExecSvc,
                 qryExecSvc,
                 schemaExecSvc,
+                customExecSvcs,
                 plugins
             );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 2eda01c..4b34891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +60,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.ExecutorConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
@@ -1530,6 +1532,9 @@ public class IgnitionEx {
         /** Query executor service. */
         private ThreadPoolExecutor schemaExecSvc;
 
+        /** Executor service. */
+        private Map<String, ThreadPoolExecutor> customExecSvcs;
+
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
 
@@ -1858,6 +1863,24 @@ public class IgnitionEx {
 
             schemaExecSvc.allowCoreThreadTimeOut(true);
 
+            if (!F.isEmpty(cfg.getExecutorConfiguration())) {
+                validateCustomExecutorsConfiguration(cfg.getExecutorConfiguration());
+
+                customExecSvcs = new HashMap<>();
+
+                for(ExecutorConfiguration execCfg : cfg.getExecutorConfiguration()) {
+                    ThreadPoolExecutor exec = new IgniteThreadPoolExecutor(
+                        execCfg.getName(),
+                        cfg.getIgniteInstanceName(),
+                        execCfg.getSize(),
+                        execCfg.getSize(),
+                        DFLT_THREAD_KEEP_ALIVE_TIME,
+                        new LinkedBlockingQueue<Runnable>());
+
+                    customExecSvcs.put(execCfg.getName(), exec);
+                }
+            }
+
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());
 
@@ -1886,6 +1909,7 @@ public class IgnitionEx {
                     callbackExecSvc,
                     qryExecSvc,
                     schemaExecSvc,
+                    customExecSvcs,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -1962,6 +1986,30 @@ public class IgnitionEx {
         }
 
         /**
+         * @param cfgs Array of the executors configurations.
+         * @throws IgniteCheckedException If configuration is wrong.
+         */
+        private static void validateCustomExecutorsConfiguration(ExecutorConfiguration[] cfgs)
+            throws IgniteCheckedException {
+            if (cfgs == null)
+                return;
+
+            Set<String> names = new HashSet<>(cfgs.length);
+
+            for (ExecutorConfiguration cfg : cfgs) {
+                if (F.isEmpty(cfg.getName()))
+                    throw new IgniteCheckedException("Custom executor name cannot be null or empty.");
+
+                if (!names.add(cfg.getName()))
+                    throw new IgniteCheckedException("Duplicate custom executor name: " + cfg.getName());
+
+                if (cfg.getSize() <= 0)
+                    throw new IgniteCheckedException("Custom executor size must be positive [name=" + cfg.getName() +
+                        ", size=" + cfg.getSize() + ']');
+            }
+        }
+
+        /**
          * @param cfg Ignite configuration copy to.
          * @return New ignite configuration.
          * @throws IgniteCheckedException If failed.
@@ -2116,6 +2164,17 @@ public class IgnitionEx {
 
             initializeDefaultCacheConfiguration(myCfg);
 
+            ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration();
+
+            if (execCfgs != null) {
+                ExecutorConfiguration[] clone = execCfgs.clone();
+
+                for (int i = 0; i < execCfgs.length; i++)
+                    clone[i] = new ExecutorConfiguration(execCfgs[i]);
+
+                myCfg.setExecutorConfiguration(clone);
+            }
+
             if (!myCfg.isClientMode() && myCfg.getMemoryConfiguration() == null) {
                 MemoryConfiguration memCfg = new MemoryConfiguration();
 
@@ -2522,6 +2581,13 @@ public class IgnitionEx {
             U.shutdownNow(getClass(), callbackExecSvc, log);
 
             callbackExecSvc = null;
+
+            if (!F.isEmpty(customExecSvcs)) {
+                for (ThreadPoolExecutor exec : customExecSvcs.values())
+                    U.shutdownNow(getClass(), exec, log);
+
+                customExecSvcs = null;
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 83fc3b5..c4f7519 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -839,12 +841,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
 
         try {
+            String execName = msg.executorName();
+
+            if (execName != null) {
+                Executor exec = pools.customExecutor(execName);
+
+                if (exec != null) {
+                    exec.execute(c);
+
+                    return;
+                }
+                else {
+                    LT.warn(log, "Custom executor doesn't exist (message will be processed in default " +
+                        "thread pool): " + execName);
+                }
+            }
+
             pools.poolForPolicy(plc).execute(c);
         }
         catch (RejectedExecutionException e) {
-            U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +
-                "on 'ExecutorService' provided by 'IgniteConfiguration.getPublicThreadPoolSize()'. " +
-                "Will attempt to process message in the listener thread instead.", e);
+            U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " +
+                "message in the listener thread instead.", e);
 
             c.run();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 2ad4a0b..16eae26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.managers.communication;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.ExecutorAwareMessage;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper for all grid messages.
@@ -334,6 +337,16 @@ public class GridIoMessage implements Message {
             return Integer.MIN_VALUE;
     }
 
+    /**
+     * @return Executor name (if available).
+     */
+    @Nullable public String executorName() {
+        if (msg instanceof ExecutorAwareMessage)
+            return ((ExecutorAwareMessage)msg).executorName();
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridIoMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f91ee34..1051807 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -146,11 +146,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param mode Distribution mode.
      * @param jobs Closures to execute.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Task execution future.
      */
     public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
-        @Nullable Collection<ClusterNode> nodes) {
-        return runAsync(mode, jobs, nodes, false);
+        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+        return runAsync(mode, jobs, nodes, false, execName);
     }
 
     /**
@@ -158,12 +159,14 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param jobs Closures to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param execName Custom executor name.
      * @return Task execution future.
      */
     public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
         Collection<? extends Runnable> jobs,
         @Nullable Collection<ClusterNode> nodes,
-        boolean sys)
+        boolean sys,
+        @Nullable String execName)
     {
         assert mode != null;
         assert !F.isEmpty(jobs) : jobs;
@@ -181,7 +184,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T1(mode, jobs), null, sys);
+            return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -196,7 +199,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      */
     public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
         @Nullable Collection<ClusterNode> nodes) {
-        return runAsync(mode, job, nodes, false);
+        return runAsync(mode, job, nodes, null);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param execName Custom executor name.
+     * @return Task execution future.
+     */
+    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
+        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+        return runAsync(mode, job, nodes, false, execName);
     }
 
     /**
@@ -204,12 +219,14 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Closure to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param execName Custom executor name.
      * @return Task execution future.
      */
     public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
         Runnable job,
         @Nullable Collection<ClusterNode> nodes,
-        boolean sys)
+        boolean sys,
+        @Nullable String execName)
     {
         assert mode != null;
         assert job != null;
@@ -222,7 +239,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T2(mode, job), null, sys);
+            return ctx.task().execute(new T2(mode, job), null, sys, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -341,6 +358,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param jobs Closures to execute.
      * @param rdc Reducer.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @param <R1> Type.
      * @param <R2> Type.
      * @return Reduced result.
@@ -348,7 +366,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode,
         Collection<? extends Callable<R1>> jobs,
         IgniteReducer<R1, R2> rdc,
-        @Nullable Collection<ClusterNode> nodes)
+        @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName)
     {
         assert mode != null;
         assert rdc != null;
@@ -362,7 +381,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T3<>(mode, jobs, rdc), null);
+            return ctx.task().execute(new T3<>(mode, jobs, rdc), null, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -380,7 +399,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         GridClosureCallMode mode,
         @Nullable Collection<? extends Callable<R>> jobs,
         @Nullable Collection<ClusterNode> nodes) {
-        return callAsync(mode, jobs, nodes, false);
+        return callAsync(mode, jobs, nodes, null);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @param execName Custom executor name.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+        GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R>> jobs,
+        @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName) {
+        return callAsync(mode, jobs, nodes, false, execName);
     }
 
     /**
@@ -388,13 +423,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param jobs Closures to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param execName Custom executor name.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
     public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode,
         Collection<? extends Callable<R>> jobs,
         @Nullable Collection<ClusterNode> nodes,
-        boolean sys)
+        boolean sys,
+        @Nullable String execName)
     {
         assert mode != null;
         assert !F.isEmpty(jobs);
@@ -407,7 +444,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T6<>(mode, jobs), null, sys);
+            return ctx.task().execute(new T6<>(mode, jobs), null, sys, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -415,7 +452,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     *
      * @param mode Distribution mode.
      * @param job Closure to execute.
      * @param nodes Grid nodes.
@@ -424,7 +460,21 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      */
     public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
         @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
-        return callAsync(mode, job, nodes, false);
+        return callAsync(mode, job, nodes, null);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param execName Custom executor name.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
+        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName) {
+        return callAsync(mode, job, nodes, false, execName);
     }
 
     /**
@@ -432,13 +482,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param partId Partition.
      * @param job Closure to execute.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Grid future for collection of closure results.
      * @throws IgniteCheckedException If failed.
      */
     public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
         int partId,
         Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName) throws IgniteCheckedException {
         assert partId >= 0 : partId;
 
         busyLock.readLock();
@@ -457,7 +509,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, false);
+            return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null,
+                false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -469,13 +522,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param partId Partition.
      * @param job Job.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Job future.
      * @throws IgniteCheckedException If failed.
      */
     public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
         int partId,
         Runnable job,
-        @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName) throws IgniteCheckedException {
         assert partId >= 0 : partId;
 
         busyLock.readLock();
@@ -494,7 +549,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null, false);
+            return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null,
+                false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -588,13 +644,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Closure to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param execName Custom executor name.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
     public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
         Callable<R> job,
         @Nullable Collection<ClusterNode> nodes,
-        boolean sys)
+        boolean sys,
+        @Nullable String execName)
     {
         assert mode != null;
         assert job != null;
@@ -607,7 +665,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T7<>(mode, job), null, sys);
+            return ctx.task().execute(new T7<>(mode, job), null, sys, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -618,10 +676,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Job closure.
      * @param arg Optional job argument.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Grid future for execution result.
      */
     public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes) {
+        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
         busyLock.readLock();
 
         try {
@@ -630,7 +689,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T8(job, arg), null, false);
+            return ctx.task().execute(new T8(job, arg), null, false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -641,33 +700,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Job closure.
      * @param arg Optional job argument.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Grid future for execution result.
      */
     public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes) {
-        busyLock.readLock();
-
-        try {
-            if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(U.emptyTopologyException());
-
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T11<>(job), arg, false);
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * @param job Job closure.
-     * @param arg Optional job argument.
-     * @param nodes Grid nodes.
-     * @return Grid future for execution result.
-     */
-    public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes) {
+        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
         busyLock.readLock();
 
         try {
@@ -675,9 +712,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
                 return new GridFinishedFuture<>(U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
 
-            return ctx.task().execute(new T11<>(job), arg, false);
+            return ctx.task().execute(new T11<>(job), arg, false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -688,11 +724,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Job closure.
      * @param args Job arguments.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Grid future for execution result.
      */
     public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job,
         @Nullable Collection<? extends T> args,
-        @Nullable Collection<ClusterNode> nodes)
+        @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName)
     {
         busyLock.readLock();
 
@@ -702,7 +740,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T9<>(job, args), null, false);
+            return ctx.task().execute(new T9<>(job, args), null, false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -714,10 +752,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param args Job arguments.
      * @param rdc Reducer.
      * @param nodes Grid nodes.
+     * @param execName Custom executor name.
      * @return Grid future for execution result.
      */
     public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job,
-        Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) {
+        Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes,
+        @Nullable String execName) {
         busyLock.readLock();
 
         try {
@@ -726,7 +766,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T10<>(job, args, rdc), null, false);
+            return ctx.task().execute(new T10<>(job, args, rdc), null, false, execName);
         }
         finally {
             busyLock.readUnlock();
@@ -1122,7 +1162,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection)}.
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection,String)}.
      */
     private class T1 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
         /** */
@@ -1156,7 +1196,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection)}.
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection, String)}.
      */
     private class T2 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
         /** */
@@ -1187,7 +1227,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection)}
+     * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)}
      */
     private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> implements GridNoImplicitInjection {
         /** */
@@ -1378,7 +1418,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection)}
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection, String)}
      */
     private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> implements GridNoImplicitInjection {
         /** */
@@ -1421,7 +1461,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection)}
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection, String)}
      */
     private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 91ec8a9..369ca22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -73,6 +74,7 @@ import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -1058,7 +1060,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             sesAttrs,
                             req.isSessionFullSupport(),
                             req.isInternal(),
-                            req.getSubjectId());
+                            req.getSubjectId(),
+                            req.executorName());
 
                         taskSes.setCheckpointSpi(req.getCheckpointSpi());
                         taskSes.setClassLoader(dep.classLoader());
@@ -1098,7 +1101,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         evtLsnr,
                         holdLsnr,
                         partsReservation,
-                        req.getTopVer());
+                        req.getTopVer(),
+                        req.executorName());
 
                     jobCtx.job(job);
 
@@ -1274,7 +1278,20 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private boolean executeAsync(GridJobWorker jobWorker) {
         try {
-            ctx.getExecutorService().execute(jobWorker);
+            if (jobWorker.executorName() != null) {
+                Executor customExec = ctx.pools().customExecutor(jobWorker.executorName());
+
+                if (customExec != null)
+                    customExec.execute(jobWorker);
+                else {
+                    LT.warn(log, "Custom executor doesn't exist (local job will be processed in default " +
+                        "thread pool): " + jobWorker.executorName());
+
+                    ctx.getExecutorService().execute(jobWorker);
+                }
+            }
+            else
+                ctx.getExecutorService().execute(jobWorker);
 
             if (metricsUpdateFreq > -1L)
                 startedJobsCnt.increment();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5c9b9e2..c9129c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -168,6 +168,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     /** Request topology version. */
     private final AffinityTopologyVersion reqTopVer;
 
+    /** Request topology version. */
+    private final String execName;
+
     /**
      * @param ctx Kernal context.
      * @param dep Grid deployment.
@@ -182,6 +185,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @param holdLsnr Hold listener.
      * @param partsReservation Reserved partitions (must be released at the job finish).
      * @param reqTopVer Affinity topology version of the job request.
+     * @param execName Custom executor name.
      */
     GridJobWorker(
         GridKernalContext ctx,
@@ -196,7 +200,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         GridJobEventListener evtLsnr,
         GridJobHoldListener holdLsnr,
         GridReservable partsReservation,
-        AffinityTopologyVersion reqTopVer) {
+        AffinityTopologyVersion reqTopVer,
+        String execName) {
         super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
 
         assert ctx != null;
@@ -219,6 +224,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         this.holdLsnr = holdLsnr;
         this.partsReservation = partsReservation;
         this.reqTopVer = reqTopVer;
+        this.execName = execName;
 
         if (job != null)
             this.job = job;
@@ -727,6 +733,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     }
 
     /**
+     * @return Custom executor name.
+     */
+    public String executorName() {
+        return execName;
+    }
+
+    /**
      * @param evtType Event type.
      * @param msg Message.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 37bbb54..221c7bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.pool;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.plugin.extensions.communication.IoPool;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Processor which abstracts out thread pool management.
@@ -34,6 +37,9 @@ public class PoolProcessor extends GridProcessorAdapter {
     /** Map of {@link IoPool}-s injected by Ignite plugins. */
     private final IoPool[] extPools = new IoPool[128];
 
+    /** Custom named pools. */
+    private final Map<String, ? extends ExecutorService> customExecs;
+
     /**
      * Constructor.
      *
@@ -72,6 +78,8 @@ public class PoolProcessor extends GridProcessorAdapter {
                 }
             }
         }
+
+        customExecs = ctx.customExecutors();
     }
 
     /** {@inheritDoc} */
@@ -165,4 +173,21 @@ public class PoolProcessor extends GridProcessorAdapter {
             }
         }
     }
+
+    /**
+     * Gets executor service for custom policy by executor name.
+     *
+     * @param name Executor name.
+     * @return Executor service.
+     */
+    @Nullable public Executor customExecutor(String name) {
+        assert name != null;
+
+        Executor exec = null;
+
+        if (customExecs != null)
+            exec = customExecs.get(name);
+
+        return exec;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
index 9af038a..f9937a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
@@ -76,6 +76,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
      * @param fullSup {@code True} to enable distributed session attributes and checkpoints.
      * @param internal {@code True} in case of internal task.
      * @param subjId Subject ID.
+     * @param execName Custom executor name.
      * @return New session if one did not exist, or existing one.
      */
     public GridTaskSessionImpl createTaskSession(
@@ -91,7 +92,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
         Map<Object, Object> attrs,
         boolean fullSup,
         boolean internal,
-        UUID subjId) {
+        UUID subjId,
+        @Nullable String execName) {
         if (!fullSup) {
             return new GridTaskSessionImpl(
                 taskNodeId,
@@ -107,7 +109,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                 ctx,
                 false,
                 internal,
-                subjId);
+                subjId,
+                execName);
         }
 
         while (true) {
@@ -130,7 +133,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                         ctx,
                         true,
                         internal,
-                        subjId));
+                        subjId,
+                        execName));
 
                 if (old != null)
                     ses = old;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d34f297..22d5716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -359,6 +359,19 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) {
+        return execute(taskCls, arg, null);
+    }
+
+    /**
+     * @param taskCls Task class.
+     * @param arg Optional execution argument.
+     * @param execName Name of the custom executor.
+     * @return Task future.
+     * @param <T> Task argument type.
+     * @param <R> Task return value type.
+     */
+    public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg,
+        @Nullable String execName) {
         assert taskCls != null;
 
         lock.readLock();
@@ -367,7 +380,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls);
 
-            return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+            return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+                false, execName);
         }
         finally {
             lock.readUnlock();
@@ -382,7 +396,19 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) {
-        return execute(task, arg, false);
+        return execute(task, arg, false, null);
+    }
+
+    /**
+     * @param task Actual task.
+     * @param arg Optional task argument.
+     * @param execName Name of the custom executor.
+     * @return Task future.
+     * @param <T> Task argument type.
+     * @param <R> Task return value type.
+     */
+    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, String execName) {
+        return execute(task, arg, false, execName);
     }
 
     /**
@@ -394,13 +420,28 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) {
+        return execute(task, arg, sys, null);
+    }
+
+    /**
+     * @param task Actual task.
+     * @param arg Optional task argument.
+     * @param sys If {@code true}, then system pool will be used.
+     * @param execName Name of the custom executor.
+     * @return Task future.
+     * @param <T> Task argument type.
+     * @param <R> Task return value type.
+     */
+    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys,
+        @Nullable String execName) {
         lock.readLock();
 
         try {
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task);
 
-            return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, sys);
+            return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+                sys, execName);
         }
         finally {
             lock.readUnlock();
@@ -436,6 +477,18 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) {
+        return execute(taskName, arg, null);
+    }
+
+    /**
+     * @param taskName Task name.
+     * @param arg Optional execution argument.
+     * @param execName Name of the custom executor.
+     * @return Task future.
+     * @param <T> Task argument type.
+     * @param <R> Task return value type.
+     */
+    public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg, @Nullable String execName) {
         assert taskName != null;
 
         lock.readLock();
@@ -444,7 +497,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName);
 
-            return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+            return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+                false, execName);
         }
         finally {
             lock.readUnlock();
@@ -458,6 +512,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      * @param sesId Task session ID.
      * @param arg Optional task argument.
      * @param sys If {@code true}, then system pool will be used.
+     * @param execName Name of the custom executor.
      * @return Task future.
      */
     @SuppressWarnings("unchecked")
@@ -467,7 +522,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         @Nullable ComputeTask<T, R> task,
         IgniteUuid sesId,
         @Nullable T arg,
-        boolean sys) {
+        boolean sys,
+        @Nullable String execName) {
         assert sesId != null;
 
         String taskClsName;
@@ -629,7 +685,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
             Collections.emptyMap(),
             fullSup,
             internal,
-            subjId);
+            subjId,
+            execName);
 
         ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index cb5aabe..62224f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1372,7 +1372,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         subjId,
                         affCacheIds,
                         affPartId,
-                        mapTopVer);
+                        mapTopVer,
+                        ses.executorName());
 
                     if (loc)
                         ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
new file mode 100644
index 0000000..2277100
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.ExecutorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests custom executor configuration.
+ */
+public class IgniteComputeCustomExecutorConfigurationSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConfigurations() throws Exception {
+        try {
+            checkStartWithInvalidConfiguration(getConfiguration("node0")
+                .setExecutorConfiguration(new ExecutorConfiguration()));
+
+            checkStartWithInvalidConfiguration(getConfiguration("node0")
+                .setExecutorConfiguration(new ExecutorConfiguration("")));
+
+            checkStartWithInvalidConfiguration(getConfiguration("node0")
+                .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(-1)));
+
+            checkStartWithInvalidConfiguration(getConfiguration("node0")
+                .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(0)));
+        }
+        finally {
+            Ignition.stopAll(true);
+        }
+    }
+
+    /**
+     * @param cfg Ignite configuration.
+     * @throws Exception If failed.
+     */
+    private void checkStartWithInvalidConfiguration(IgniteConfiguration cfg) throws Exception {
+        try {
+            Ignition.start(cfg);
+
+            fail("Node start must fail.");
+        }
+        catch (IgniteException e) {
+            // No-op.
+        }
+    }
+}


Mime
View raw message