ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-3875: Introduced separate thread pool for data streamer. This closes #1173. This closes #1383.
Date Wed, 28 Dec 2016 09:58:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 7e73d0223 -> 7d82d6a06


IGNITE-3875: Introduced separate thread pool for data streamer. This closes #1173. This closes
#1383.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d82d6a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d82d6a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d82d6a0

Branch: refs/heads/ignite-2.0
Commit: 7d82d6a06b5e9f1f8cd2909b865e37d46b8da03f
Parents: 7e73d02
Author: devozerov <vozerov@gridgain.com>
Authored: Wed Dec 28 12:58:11 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Dec 28 12:58:11 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  31 ++++++
 .../ignite/internal/GridKernalContext.java      |   7 ++
 .../ignite/internal/GridKernalContextImpl.java  |  12 +++
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  19 ++++
 .../managers/communication/GridIoManager.java   |   2 +
 .../managers/communication/GridIoPolicy.java    |   3 +
 .../closure/GridClosureProcessor.java           |   2 +-
 .../datastreamer/DataStreamProcessor.java       |  60 ++++++++---
 .../datastreamer/DataStreamerImpl.java          |  37 ++-----
 .../internal/processors/pool/PoolProcessor.java |   5 +
 .../DataStreamProcessorSelfTest.java            | 104 +++++++++++++++++++
 .../junits/GridTestKernalContext.java           |  12 +--
 13 files changed, 249 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index dcd8a80..e0ff9b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -148,6 +148,9 @@ public class IgniteConfiguration {
     /** Default core size of public thread pool. */
     public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
 
+    /** Default size of data streamer thread pool. */
+    public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
+
     /** Default keep alive time for public thread pool. */
     @Deprecated
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
@@ -251,6 +254,9 @@ public class IgniteConfiguration {
     /** IGFS pool size. */
     private int igfsPoolSize = AVAILABLE_PROC_CNT;
 
+    /** Data stream pool size. */
+    private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE;
+
     /** Utility cache pool size. */
     private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -514,6 +520,7 @@ public class IgniteConfiguration {
         clockSyncFreq = cfg.getClockSyncFrequency();
         clockSyncSamples = cfg.getClockSyncSamples();
         consistentId = cfg.getConsistentId();
+        dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
         failureDetectionTimeout = cfg.getFailureDetectionTimeout();
@@ -837,6 +844,17 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing data stream messages.
+     * <p>
+     * If not provided, executor service will have size {@link #DFLT_DATA_STREAMER_POOL_SIZE}.
+     *
+     * @return Thread pool size to be used for data stream messages.
+     */
+    public int getDataStreamerThreadPoolSize() {
+        return dataStreamerPoolSize;
+    }
+
+    /**
      * Default size of thread pool that is in charge of processing utility cache messages.
      * <p>
      * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -960,6 +978,19 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Set thread pool size that will be used to process data stream messages.
+     *
+     * @param poolSize Executor service to use for data stream messages.
+     * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) {
+        dataStreamerPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets default thread pool size that will be used to process utility cache messages.
      *
      * @param poolSize Default executor service size to use for utility cache messages.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 927944f..9157fed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -542,6 +542,13 @@ public interface GridKernalContext extends Iterable<GridComponent>
{
     public ExecutorService getIgfsExecutorService();
 
     /**
+     * Executor service that is in charge of processing data stream messages.
+     *
+     * @return Thread pool implementation to be used for data stream messages.
+     */
+    public ExecutorService getDataStreamerExecutorService();
+
+    /**
      * Should return an instance of fully configured thread pool to be used for
      * processing of client messages (REST requests).
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a2ad1b2..8fc5b36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -317,6 +317,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    private ExecutorService dataStreamExecSvc;
+
+    /** */
+    @GridToStringExclude
     protected ExecutorService restExecSvc;
 
     /** */
@@ -390,6 +394,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
+     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc REST executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
@@ -410,6 +415,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
+        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -431,6 +437,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.p2pExecSvc = p2pExecSvc;
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
+        this.dataStreamExecSvc = dataStreamExecSvc;
         this.restExecSvc = restExecSvc;
         this.affExecSvc = affExecSvc;
         this.idxExecSvc = idxExecSvc;
@@ -977,6 +984,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public ExecutorService getDataStreamerExecutorService() {
+        return dataStreamExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public ExecutorService getRestExecutorService() {
         return restExecSvc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4972d1f..99c3dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -668,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
+     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc Reset executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
@@ -685,6 +686,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
+        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -794,6 +796,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
                 p2pExecSvc,
                 mgmtExecSvc,
                 igfsExecSvc,
+                dataStreamExecSvc,
                 restExecSvc,
                 affExecSvc,
                 idxExecSvc,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f32a753..9fe6fd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1472,6 +1472,9 @@ public class IgnitionEx {
         /** IGFS executor service. */
         private ThreadPoolExecutor igfsExecSvc;
 
+        /** Data streamer executor service. */
+        private ThreadPoolExecutor dataStreamerExecSvc;
+
         /** REST requests executor service. */
         private ThreadPoolExecutor restExecSvc;
 
@@ -1702,6 +1705,17 @@ public class IgnitionEx {
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
+            // Note that we do not pre-start threads here as this pool may not be needed.
+            dataStreamerExecSvc = new IgniteThreadPoolExecutor(
+                "data-streamer",
+                cfg.getGridName(),
+                cfg.getDataStreamerThreadPoolSize(),
+                cfg.getDataStreamerThreadPoolSize(),
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
+
+            dataStreamerExecSvc.allowCoreThreadTimeOut(true);
+
             // Note that we do not pre-start threads here as igfs pool may not be needed.
             validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS");
 
@@ -1806,6 +1820,7 @@ public class IgnitionEx {
                     p2pExecSvc,
                     mgmtExecSvc,
                     igfsExecSvc,
+                    dataStreamerExecSvc,
                     restExecSvc,
                     affExecSvc,
                     idxExecSvc,
@@ -2445,6 +2460,10 @@ public class IgnitionEx {
 
             p2pExecSvc = null;
 
+            U.shutdownNow(getClass(), dataStreamerExecSvc, log);
+
+            dataStreamerExecSvc = null;
+
             U.shutdownNow(getClass(), igfsExecSvc, log);
 
             igfsExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..de34adb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -90,6 +90,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
@@ -686,6 +687,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 case MARSH_CACHE_POOL:
                 case IDX_POOL:
                 case IGFS_POOL:
+                case DATA_STREAMER_POOL:
                 {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 70a7354..18235d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -49,6 +49,9 @@ public class GridIoPolicy {
     /** Pool for handling distributed index range requests. */
     public static final byte IDX_POOL = 8;
 
+    /** Data streamer execution pool. */
+    public static final byte DATA_STREAMER_POOL = 9;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index a07dbf8..5ba21d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -988,7 +988,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte
plc) {
+    public  <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte
plc) {
         try {
             return callLocal(c, plc);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index fee4dd6..5ebfd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -17,11 +17,8 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.DelayQueue;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -40,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.Marshaller;
@@ -47,11 +45,15 @@ import org.apache.ignite.stream.StreamReceiver;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.DelayQueue;
+
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
 
 /**
- *
+ * Data stream processor.
  */
 public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
     /** Loaders map (access is not supposed to be highly concurrent). */
@@ -224,13 +226,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
                 IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
 
                 if (fut != null && !fut.isDone()) {
+                    final byte plc = threadIoPolicy();
+
                     fut.listen(new CI1<IgniteInternalFuture<?>>() {
                         @Override public void apply(IgniteInternalFuture<?> t) {
                             ctx.closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
                                     processRequest(nodeId, req);
                                 }
-                            }, false);
+                            }, plc);
                         }
                     });
 
@@ -416,12 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
 
         try {
-            Byte plc = GridIoManager.currentPolicy();
-
-            if (plc == null)
-                plc = PUBLIC_POOL;
-
-            ctx.io().send(nodeId, resTopic, res, plc);
+            ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))
@@ -431,6 +430,41 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
         }
     }
 
+    /**
+     * Get IO policy.
+     *
+     * @return IO policy.
+     */
+    private static byte threadIoPolicy() {
+        Byte plc = GridIoManager.currentPolicy();
+
+        if (plc == null)
+            plc = DATA_STREAMER_POOL;
+
+        return plc;
+    }
+
+    /**
+     * Get IO policy for particular node with provided resolver.
+     *
+     * @param rslvr Resolver.
+     * @param node Node.
+     * @return IO policy.
+     */
+    public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode
node) {
+        assert node != null;
+
+        Byte res = null;
+
+        if (rslvr != null)
+            res = rslvr.apply(node);
+
+        if (res == null)
+            res = DATA_STREAMER_POOL;
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index bb9ffdd..0526162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -112,16 +111,12 @@ import org.jsr166.LongAdder8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
  * Data streamer implementation.
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
{
-    /** Default policy resolver. */
-    private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
-
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
 
@@ -135,7 +130,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     private byte[] updaterBytes;
 
     /** IO policy resovler for data load request. */
-    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr;
 
     /** Max remap count before issuing an error. */
     private static final int DFLT_MAX_REMAP_CNT = 32;
@@ -1509,10 +1504,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
          * @param entries Entries.
          * @param reqTopVer Request topology version.
          * @param curFut Current future.
+         * @param plc Policy.
          */
         private void localUpdate(final Collection<DataStreamerEntry> entries,
             final AffinityTopologyVersion reqTopVer,
-            final GridFutureAdapter<Object> curFut) {
+            final GridFutureAdapter<Object> curFut,
+            final byte plc) {
             try {
                 GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
 
@@ -1543,7 +1540,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                                 skipStore,
                                 keepBinary,
                                 rcvr),
-                            false);
+                            plc);
 
                         locFuts.add(callFut);
 
@@ -1573,7 +1570,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                     else {
                         fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>()
{
                             @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
e) {
-                                localUpdate(entries, reqTopVer, curFut);
+                                localUpdate(entries, reqTopVer, curFut, plc);
                             }
                         });
                     }
@@ -1617,13 +1614,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             IgniteInternalFuture<Object> fut;
 
-            Byte plc = ioPlcRslvr.apply(node);
-
-            if (plc == null)
-                plc = PUBLIC_POOL;
+            byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node);
 
-            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL)
-                localUpdate(entries, topVer, curFut);
+            if (isLocNode)
+                localUpdate(entries, topVer, curFut, plc);
             else {
                 try {
                     for (DataStreamerEntry e : entries) {
@@ -1975,19 +1969,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     }
 
     /**
-     * Default IO policy resolver.
-     */
-    private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode,
Byte> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public Byte apply(ClusterNode gridNode) {
-            return PUBLIC_POOL;
-        }
-    }
-
-    /**
      * Key object wrapper. Using identity equals prevents slow down in case of hash code
collision.
      */
     private static class KeyCacheObjectWrapper {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 59e5e7d..89140b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter {
 
                 return ctx.getIgfsExecutorService();
 
+            case GridIoPolicy.DATA_STREAMER_POOL:
+                assert ctx.getDataStreamerExecutorService() != null : "Data streamer pool
is not configured.";
+
+                return ctx.getDataStreamerExecutorService();
+
             default: {
                 if (plc < 0)
                     throw new IgniteCheckedException("Policy cannot be negative: " + plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 0f8ae29..d00e08b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -33,6 +33,7 @@ import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
@@ -59,6 +61,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.stream.StreamReceiver;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -949,6 +952,94 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testLocalDataStreamerDedicatedThreadPool() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            final IgniteCache<String, String> cache = ignite.cache(null);
+
+            IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null);
+            try {
+                ldr.receiver(new StreamReceiver<String, String>() {
+                    @Override public void receive(IgniteCache<String, String> cache,
+                        Collection<Map.Entry<String, String>> entries) throws
IgniteException {
+                        String threadName = Thread.currentThread().getName();
+
+                        cache.put("key", threadName);
+                    }
+                });
+                ldr.addData("key", "value");
+
+                ldr.tryFlush();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return cache.get("key") != null;
+                    }
+                }, 3_000);
+            }
+            finally {
+                ldr.close(true);
+            }
+
+            assertNotNull(cache.get("key"));
+
+            assertTrue(cache.get("key").startsWith("data-streamer"));
+
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteDataStreamerDedicatedThreadPool() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            useCache = false;
+
+            Ignite client = startGrid(0);
+
+            final IgniteCache<String, String> cache = ignite.cache(null);
+
+            IgniteDataStreamer<String, String> ldr = client.dataStreamer(null);
+
+            try {
+                ldr.receiver(new StringStringStreamReceiver());
+
+                ldr.addData("key", "value");
+
+                ldr.tryFlush();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return cache.get("key") != null;
+                    }
+                }, 3_000);
+            }
+            finally {
+                ldr.close(true);
+            }
+
+            assertNotNull(cache.get("key"));
+
+            assertTrue(cache.get("key").startsWith("data-streamer"));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      *
      */
     public static class TestObject {
@@ -1024,4 +1115,17 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest
{
             }
         }
     }
+
+    /**
+     *
+     */
+    private static class StringStringStreamReceiver implements StreamReceiver<String,
String> {
+        /** {@inheritDoc} */
+        @Override public void receive(IgniteCache<String, String> cache,
+            Collection<Map.Entry<String, String>> entries) throws IgniteException
{
+            String threadName = Thread.currentThread().getName();
+
+            cache.put("key", threadName);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 143159d..40f0e43 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -64,6 +64,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders()
         );
 
@@ -98,11 +99,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTestKernalContext.class, this, super.toString());
-    }
-
     /**
      * Sets system executor service.
      *
@@ -112,7 +108,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
         this.sysExecSvc = sysExecSvc;
     }
 
-
     /**
      * Sets executor service.
      *
@@ -121,4 +116,9 @@ public class GridTestKernalContext extends GridKernalContextImpl {
     public void setExecutorService(ExecutorService execSvc){
         this.execSvc = execSvc;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridTestKernalContext.class, this, super.toString());
+    }
 }


Mime
View raw message