ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [46/50] [abbrv] ignite git commit: Revert "IGNITE-3875: Added separate thread pool for data streamer. This closes #1067."
Date Wed, 26 Oct 2016 22:55:38 GMT
Revert "IGNITE-3875: Added separate thread pool for data streamer. This closes #1067."

This reverts commit f597aff1bdf65d3d430cf85c9932391a72c2d7dc.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/442fedc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/442fedc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/442fedc1

Branch: refs/heads/ignite-4110
Commit: 442fedc17bdae43b1c87d6bb4680f724a18adb52
Parents: 51cef7c
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Oct 18 12:25:56 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Oct 26 11:11:49 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      | 31 -------
 .../ignite/internal/GridKernalContext.java      |  7 --
 .../ignite/internal/GridKernalContextImpl.java  | 12 ---
 .../apache/ignite/internal/IgniteKernal.java    |  3 -
 .../org/apache/ignite/internal/IgnitionEx.java  | 20 +---
 .../managers/communication/GridIoManager.java   |  2 -
 .../managers/communication/GridIoPolicy.java    |  3 -
 .../closure/GridClosureProcessor.java           |  3 +-
 .../datastreamer/DataStreamProcessor.java       | 82 ++---------------
 .../datastreamer/DataStreamerImpl.java          | 31 ++++++-
 .../internal/processors/pool/PoolProcessor.java |  3 -
 .../DataStreamProcessorSelfTest.java            | 97 --------------------
 .../junits/GridTestKernalContext.java           | 12 +--
 13 files changed, 44 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index d039584..75145a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -148,9 +148,6 @@ public class IgniteConfiguration {
     /** Default core size of public thread pool. */
     public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2;
 
-    /** Default size of data streamer thread pool. */
-    public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
-
     /** Default keep alive time for public thread pool. */
     @Deprecated
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
@@ -248,9 +245,6 @@ public class IgniteConfiguration {
     /** IGFS pool size. */
     private int igfsPoolSize = AVAILABLE_PROC_CNT;
 
-    /** Data stream pool size. */
-    private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE;
-
     /** Utility cache pool size. */
     private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -514,7 +508,6 @@ public class IgniteConfiguration {
         clockSyncFreq = cfg.getClockSyncFrequency();
         clockSyncSamples = cfg.getClockSyncSamples();
         consistentId = cfg.getConsistentId();
-        dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
         failureDetectionTimeout = cfg.getFailureDetectionTimeout();
@@ -796,17 +789,6 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Size of thread pool that is in charge of processing data stream messages.
-     * <p>
-     * If not provided, executor service will have size {@link #DFLT_DATA_STREAMER_POOL_SIZE}.
-     *
-     * @return Thread pool size to be used for data stream messages.
-     */
-    public int getDataStreamerThreadPoolSize() {
-        return dataStreamerPoolSize;
-    }
-
-    /**
      * Default size of thread pool that is in charge of processing utility cache messages.
      * <p>
      * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -930,19 +912,6 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Set thread pool size that will be used to process data stream messages.
-     *
-     * @param poolSize Executor service to use for data stream messages.
-     * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
-     * @return {@code this} for chaining.
-     */
-    public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) {
-        dataStreamerPoolSize = poolSize;
-
-        return this;
-    }
-
-    /**
      * Sets default thread pool size that will be used to process utility cache messages.
      *
      * @param poolSize Default executor service size to use for utility cache messages.

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index adace0b..ae29223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -533,13 +533,6 @@ public interface GridKernalContext extends Iterable<GridComponent>
{
     public ExecutorService getIgfsExecutorService();
 
     /**
-     * Executor service that is in charge of processing data stream messages.
-     *
-     * @return Thread pool implementation to be used for data stream messages.
-     */
-    public ExecutorService getDataStreamerExecutorService();
-
-    /**
      * Should return an instance of fully configured thread pool to be used for
      * processing of client messages (REST requests).
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index fb55800..94c6448 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -312,10 +312,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
-    private ExecutorService dataStreamExecSvc;
-
-    /** */
-    @GridToStringExclude
     protected ExecutorService restExecSvc;
 
     /** */
@@ -388,7 +384,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
-     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc REST executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
@@ -408,7 +403,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
-        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -428,7 +422,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.p2pExecSvc = p2pExecSvc;
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
-        this.dataStreamExecSvc = dataStreamExecSvc;
         this.restExecSvc = restExecSvc;
         this.affExecSvc = affExecSvc;
         this.idxExecSvc = idxExecSvc;
@@ -970,11 +963,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public ExecutorService getDataStreamerExecutorService() {
-        return dataStreamExecSvc;
-    }
-
-    /** {@inheritDoc} */
     @Override public ExecutorService getRestExecutorService() {
         return restExecSvc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 523bee6..1963509 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -666,7 +666,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
-     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc Reset executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
@@ -682,7 +681,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
-        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -790,7 +788,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
                 p2pExecSvc,
                 mgmtExecSvc,
                 igfsExecSvc,
-                dataStreamExecSvc,
                 restExecSvc,
                 affExecSvc,
                 idxExecSvc,

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 0653eff..5b2c3fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1468,9 +1468,6 @@ public class IgnitionEx {
         /** IGFS executor service. */
         private ThreadPoolExecutor igfsExecSvc;
 
-        /** Data streamer executor service. */
-        private ThreadPoolExecutor dataStreamerExecSvc;
-
         /** REST requests executor service. */
         private ThreadPoolExecutor restExecSvc;
 
@@ -1693,17 +1690,6 @@ public class IgnitionEx {
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
-            // Note that we do not pre-start threads here as this pool may not be needed.
-            dataStreamerExecSvc = new IgniteThreadPoolExecutor(
-                "data-streamer",
-                cfg.getGridName(),
-                cfg.getDataStreamerThreadPoolSize(),
-                cfg.getDataStreamerThreadPoolSize(),
-                DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
-
-            dataStreamerExecSvc.allowCoreThreadTimeOut(true);
-
             // Note that we do not pre-start threads here as igfs pool may not be needed.
             igfsExecSvc = new IgniteThreadPoolExecutor(
                 cfg.getIgfsThreadPoolSize(),
@@ -1789,7 +1775,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc,
p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, dataStreamerExecSvc, restExecSvc, affExecSvc, idxExecSvc,
callbackExecSvc,
+                    igfsExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2407,10 +2393,6 @@ public class IgnitionEx {
 
             p2pExecSvc = null;
 
-            U.shutdownNow(getClass(), dataStreamerExecSvc, log);
-
-            dataStreamerExecSvc = null;
-
             U.shutdownNow(getClass(), igfsExecSvc, log);
 
             igfsExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 77a58d3..3df29cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -84,7 +84,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
@@ -577,7 +576,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 case MARSH_CACHE_POOL:
                 case IDX_POOL:
                 case IGFS_POOL:
-                case DATA_STREAMER_POOL:
                 {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 18235d2..70a7354 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -49,9 +49,6 @@ public class GridIoPolicy {
     /** Pool for handling distributed index range requests. */
     public static final byte IDX_POOL = 8;
 
-    /** Data streamer execution pool. */
-    public static final byte DATA_STREAMER_POOL = 9;
-
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 2c08423..c5a87d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
@@ -972,7 +973,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte
plc) {
+    private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte
plc) {
         try {
             return callLocal(c, plc);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index bd33f62..7663735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -21,12 +21,10 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.DelayQueue;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -37,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.stream.StreamReceiver;
@@ -45,23 +42,12 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
- * Data stream processor.
+ *
  */
 public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
-    /** Data streamer separate pool feature major version. */
-    private static final int DATA_STREAMER_POOL_MAJOR_VER = 1;
-
-    /** Data streamer separate pool feature minor version. */
-    private static final int DATA_STREAMER_POOL_MINOR_VER = 6;
-
-    /** Data streamer separate pool feature maintenance version. */
-    private static final int DATA_STREAMER_POOL_MAINTENANCE_VER = 10;
-
-    /** Default pool for data streamer messages processing. */
-    public static final byte DFLT_POLICY = GridIoPolicy.PUBLIC_POOL;
-
     /** Loaders map (access is not supposed to be highly concurrent). */
     private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 
@@ -232,15 +218,13 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
                 IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
 
                 if (fut != null && !fut.isDone()) {
-                    final byte plc = threadIoPolicy();
-
                     fut.listen(new CI1<IgniteInternalFuture<?>>() {
                         @Override public void apply(IgniteInternalFuture<?> t) {
                             ctx.closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
                                     processRequest(nodeId, req);
                                 }
-                            }, plc);
+                            }, false);
                         }
                     });
 
@@ -356,7 +340,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
 
         try {
-            ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
+            Byte plc = GridIoManager.currentPolicy();
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
+
+            ctx.io().send(nodeId, resTopic, res, plc);
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))
@@ -366,59 +355,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
         }
     }
 
-    /**
-     * Get IO policy.
-     *
-     * @return IO policy.
-     */
-    private static byte threadIoPolicy() {
-        Byte plc = GridIoManager.currentPolicy();
-
-        if (plc == null)
-            plc = DFLT_POLICY;
-
-        return plc;
-    }
-
-    /**
-     * Get IO policy for particular node.
-     *
-     * @param node Node.
-     * @return Policy.
-     */
-    public static byte ioPolicy(ClusterNode node) {
-        assert node != null;
-
-        if (node.isLocal() || node.version().greaterThanEqual(
-            DATA_STREAMER_POOL_MAJOR_VER,
-            DATA_STREAMER_POOL_MINOR_VER,
-            DATA_STREAMER_POOL_MAINTENANCE_VER))
-            return GridIoPolicy.DATA_STREAMER_POOL;
-        else
-            return DFLT_POLICY;
-    }
-
-    /**
-     * Get IO policy for particular node with provided resolver.
-     *
-     * @param rslvr Resolver.
-     * @param node Node.
-     * @return IO policy.
-     */
-    public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode
node) {
-        assert node != null;
-
-        Byte res = null;
-
-        if (rslvr != null)
-            res = rslvr.apply(node);
-
-        if (res == null)
-            res = ioPolicy(node);
-
-        return res;
-    }
-
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 46f6380..c2f226c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -104,12 +106,16 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
  * Data streamer implementation.
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
{
+    /** Default policy reoslver. */
+    private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
+
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
 
@@ -120,7 +126,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     private byte[] updaterBytes;
 
     /** IO policy resovler for data load request. */
-    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr;
+    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
 
     /** Max remap count before issuing an error. */
     private static final int DFLT_MAX_REMAP_CNT = 32;
@@ -1307,12 +1313,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             IgniteInternalFuture<Object> fut;
 
-            byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node);
+            Byte plc = ioPlcRslvr.apply(node);
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
 
-            if (isLocNode) {
+            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
                 fut = ctx.closure().callLocalSafe(
-                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore,
keepBinary, rcvr),
-                    plc);
+                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore,
keepBinary, rcvr), false);
 
                 locFuts.add(fut);
 
@@ -1676,6 +1684,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     }
 
     /**
+     * Default IO policy resolver.
+     */
+    private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode,
Byte> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public Byte apply(ClusterNode gridNode) {
+            return PUBLIC_POOL;
+        }
+    }
+
+    /**
      * Key object wrapper. Using identity equals prevents slow down in case of hash code
collision.
      */
     private static class KeyCacheObjectWrapper {

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 26bfc0d..59e5e7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -128,9 +128,6 @@ public class PoolProcessor extends GridProcessorAdapter {
 
                 return ctx.getIgfsExecutorService();
 
-            case GridIoPolicy.DATA_STREAMER_POOL:
-                return ctx.getDataStreamerExecutorService();
-
             default: {
                 if (plc < 0)
                     throw new IgniteCheckedException("Policy cannot be negative: " + plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 401b09c..9fedc35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -33,7 +33,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
@@ -61,7 +59,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.stream.StreamReceiver;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -952,100 +949,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testLocalDataStreamerDedicatedThreadPool() throws Exception {
-        try {
-            useCache = true;
-
-            Ignite ignite = startGrid(1);
-
-            final IgniteCache<String, String> cache = ignite.cache(null);
-
-            IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null);
-            try {
-                ldr.receiver(new StreamReceiver<String, String>() {
-                    @Override public void receive(IgniteCache<String, String> cache,
-                        Collection<Map.Entry<String, String>> entries) throws
IgniteException {
-                        String threadName = Thread.currentThread().getName();
-
-                        cache.put("key", threadName);
-                    }
-                });
-                ldr.addData("key", "value");
-
-                ldr.tryFlush();
-
-                GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                    @Override public boolean apply() {
-                        return cache.get("key") != null;
-                    }
-                }, 3_000);
-            }
-            finally {
-                ldr.close(true);
-            }
-
-            assertNotNull(cache.get("key"));
-
-            assertTrue(cache.get("key").startsWith("data-streamer"));
-
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRemoteDataStreamerDedicatedThreadPool() throws Exception {
-        try {
-            useCache = true;
-
-            Ignite ignite = startGrid(1);
-
-            useCache = false;
-
-            Ignite client = startGrid(0);
-
-            final IgniteCache<String, String> cache = ignite.cache(null);
-
-            IgniteDataStreamer<String, String> ldr = client.dataStreamer(null);
-            try {
-                ldr.receiver(new StreamReceiver<String, String>() {
-                    @Override public void receive(IgniteCache<String, String> cache,
-                        Collection<Map.Entry<String, String>> entries) throws
IgniteException {
-                        String threadName = Thread.currentThread().getName();
-
-                        cache.put("key", threadName);
-                    }
-                });
-
-                ldr.addData("key", "value");
-
-                ldr.tryFlush();
-
-                GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                    @Override public boolean apply() {
-                        return cache.get("key") != null;
-                    }
-                }, 3_000);
-            }
-            finally {
-                ldr.close(true);
-            }
-
-            assertNotNull(cache.get("key"));
-
-            assertTrue(cache.get("key").startsWith("data-streamer"));
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
      *
      */
     public static class TestObject {

http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 8cb32b6..f9e2ff4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -63,7 +63,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
                 null,
                 null,
                 null,
-                null,
                 U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());
@@ -97,6 +96,11 @@ public class GridTestKernalContext extends GridKernalContextImpl {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridTestKernalContext.class, this, super.toString());
+    }
+
     /**
      * Sets system executor service.
      *
@@ -106,6 +110,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
         this.sysExecSvc = sysExecSvc;
     }
 
+
     /**
      * Sets executor service.
      *
@@ -114,9 +119,4 @@ public class GridTestKernalContext extends GridKernalContextImpl {
     public void setExecutorService(ExecutorService execSvc){
         this.execSvc = execSvc;
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTestKernalContext.class, this, super.toString());
-    }
 }


Mime
View raw message