ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/20] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 14:22:44 GMT
# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b69a23cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b69a23cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b69a23cd

Branch: refs/heads/master
Commit: b69a23cd226056307fc288af69e7863dbfe19181
Parents: a62862f
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 5 17:09:19 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 5 17:09:21 2014 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCompute.java   |   2 +-
 .../ignite/compute/ComputeLoadBalancer.java     |   2 +-
 .../apache/ignite/compute/ComputeTaskSpis.java  |   2 +-
 .../configuration/IgniteConfiguration.java      |   8 +-
 .../resources/IgniteLoadBalancerResource.java   |   2 +-
 .../ignite/spi/failover/FailoverContext.java    |   2 +-
 .../spi/loadbalancing/LoadBalancingSpi.java     | 114 ++++
 .../adaptive/AdaptiveCpuLoadProbe.java          | 229 ++++++++
 .../adaptive/AdaptiveJobCountLoadProbe.java     |  96 +++
 .../adaptive/AdaptiveLoadBalancingSpi.java      | 581 +++++++++++++++++++
 .../adaptive/AdaptiveLoadBalancingSpiMBean.java |  27 +
 .../adaptive/AdaptiveLoadProbe.java             |  90 +++
 .../AdaptiveProcessingTimeLoadProbe.java        |  98 ++++
 .../spi/loadbalancing/adaptive/package.html     |  15 +
 .../ignite/spi/loadbalancing/package.html       |  15 +
 .../RoundRobinGlobalLoadBalancer.java           | 305 ++++++++++
 .../roundrobin/RoundRobinLoadBalancingSpi.java  | 319 ++++++++++
 .../RoundRobinLoadBalancingSpiMBean.java        |  37 ++
 .../RoundRobinPerTaskLoadBalancer.java          |  96 +++
 .../spi/loadbalancing/roundrobin/package.html   |  15 +
 .../WeightedRandomLoadBalancingSpi.java         | 394 +++++++++++++
 .../WeightedRandomLoadBalancingSpiMBean.java    |  37 ++
 .../loadbalancing/weightedrandom/package.html   |  15 +
 .../affinity/GridCacheAffinityKeyMapped.java    |   2 +-
 .../org/gridgain/grid/kernal/GridGainEx.java    |   4 +-
 .../loadbalancer/GridLoadBalancerManager.java   |   2 +-
 .../spi/loadbalancing/LoadBalancingSpi.java     | 114 ----
 .../adaptive/AdaptiveCpuLoadProbe.java          | 229 --------
 .../adaptive/AdaptiveJobCountLoadProbe.java     |  96 ---
 .../adaptive/AdaptiveLoadBalancingSpi.java      | 581 -------------------
 .../adaptive/AdaptiveLoadBalancingSpiMBean.java |  27 -
 .../adaptive/AdaptiveLoadProbe.java             |  90 ---
 .../AdaptiveProcessingTimeLoadProbe.java        |  98 ----
 .../spi/loadbalancing/adaptive/package.html     |  15 -
 .../grid/spi/loadbalancing/package.html         |  15 -
 .../RoundRobinGlobalLoadBalancer.java           | 305 ----------
 .../roundrobin/RoundRobinLoadBalancingSpi.java  | 319 ----------
 .../RoundRobinLoadBalancingSpiMBean.java        |  37 --
 .../RoundRobinPerTaskLoadBalancer.java          |  96 ---
 .../spi/loadbalancing/roundrobin/package.html   |  15 -
 .../WeightedRandomLoadBalancingSpi.java         | 394 -------------
 .../WeightedRandomLoadBalancingSpiMBean.java    |  37 --
 .../loadbalancing/weightedrandom/package.html   |  15 -
 .../src/test/config/io-manager-benchmark.xml    |   2 +-
 modules/core/src/test/config/jobs-load-base.xml |   2 +-
 .../src/test/config/load/merge-sort-base.xml    |   2 +-
 .../config/spring-cache-put-remove-load.xml     |   2 +-
 ...dAdaptiveLoadBalancingSpiConfigSelfTest.java |  26 +
 ...iveLoadBalancingSpiMultipleNodeSelfTest.java |  87 +++
 .../GridAdaptiveLoadBalancingSpiSelfTest.java   | 125 ++++
 ...aptiveLoadBalancingSpiStartStopSelfTest.java |  23 +
 .../spi/loadbalancing/adaptive/package.html     |  15 +
 .../ignite/spi/loadbalancing/package.html       |  15 +
 ...alancingNotPerTaskMultithreadedSelfTest.java | 115 ++++
 ...dRobinLoadBalancingSpiLocalNodeSelfTest.java |  44 ++
 ...inLoadBalancingSpiMultipleNodesSelfTest.java | 126 ++++
 ...RobinLoadBalancingSpiNotPerTaskSelfTest.java | 121 ++++
 ...dRobinLoadBalancingSpiStartStopSelfTest.java |  23 +
 ...nLoadBalancingSpiTopologyChangeSelfTest.java |  98 ++++
 .../roundrobin/GridRoundRobinTestUtils.java     |  95 +++
 .../spi/loadbalancing/roundrobin/package.html   |  15 +
 ...tedRandomLoadBalancingSpiConfigSelfTest.java |  26 +
 ...dWeightedRandomLoadBalancingSpiSelfTest.java |  58 ++
 ...RandomLoadBalancingSpiStartStopSelfTest.java |  23 +
 ...dRandomLoadBalancingSpiWeightedSelfTest.java |  73 +++
 .../loadbalancing/weightedrandom/package.html   |  15 +
 .../grid/kernal/GridMultipleSpisSelfTest.java   |   2 +-
 .../managers/GridManagerStopSelfTest.java       |   2 +-
 ...dAdaptiveLoadBalancingSpiConfigSelfTest.java |  26 -
 ...iveLoadBalancingSpiMultipleNodeSelfTest.java |  87 ---
 .../GridAdaptiveLoadBalancingSpiSelfTest.java   | 125 ----
 ...aptiveLoadBalancingSpiStartStopSelfTest.java |  23 -
 .../spi/loadbalancing/adaptive/package.html     |  15 -
 .../grid/spi/loadbalancing/package.html         |  15 -
 ...alancingNotPerTaskMultithreadedSelfTest.java | 115 ----
 ...dRobinLoadBalancingSpiLocalNodeSelfTest.java |  44 --
 ...inLoadBalancingSpiMultipleNodesSelfTest.java | 126 ----
 ...RobinLoadBalancingSpiNotPerTaskSelfTest.java | 121 ----
 ...dRobinLoadBalancingSpiStartStopSelfTest.java |  23 -
 ...nLoadBalancingSpiTopologyChangeSelfTest.java |  98 ----
 .../roundrobin/GridRoundRobinTestUtils.java     |  95 ---
 .../spi/loadbalancing/roundrobin/package.html   |  15 -
 ...tedRandomLoadBalancingSpiConfigSelfTest.java |  26 -
 ...dWeightedRandomLoadBalancingSpiSelfTest.java |  58 --
 ...RandomLoadBalancingSpiStartStopSelfTest.java |  23 -
 ...dRandomLoadBalancingSpiWeightedSelfTest.java |  73 ---
 .../loadbalancing/weightedrandom/package.html   |  15 -
 .../GridSpiLoadBalancingSelfTestSuite.java      |   6 +-
 pom.xml                                         |   4 +-
 89 files changed, 3630 insertions(+), 3630 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index 875b22a..39818ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -51,7 +51,7 @@ import java.util.concurrent.*;
  * {@link Serializable} and should be used to run computations on the grid.
  * <h1 class="header">Load Balancing</h1>
  * In all cases other than {@code broadcast(...)}, GridGain must select a node for a computation
- * to be executed. The node will be selected based on the underlying {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi},
+ * to be executed. The node will be selected based on the underlying {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi},
  * which by default sequentially picks next available node from grid projection. Other load balancing
  * policies, such as {@code random} or {@code adaptive}, can be configured as well by selecting
  * a different load balancing SPI in grid configuration. If your logic requires some custom

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java
index edba366..54a265c 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java
@@ -18,7 +18,7 @@ import java.util.*;
 /**
  * Load balancer is used for finding the best balanced node according
  * to load balancing policy. Internally load balancer will
- * query the {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}
+ * query the {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}
  * to get the balanced node.
  * <p>
  * Load balancer can be used <i>explicitly</i> from inside {@link ComputeTask#map(List, Object)}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java
index 6c1d231..33dd020 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java
@@ -14,7 +14,7 @@ import java.lang.annotation.*;
 /**
  * This annotation allows task to specify what SPIs it wants to use.
  * Starting with {@code GridGain 2.1} you can start multiple instances
- * of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi},
+ * of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi},
  * {@link org.apache.ignite.spi.failover.FailoverSpi}, and {@link org.apache.ignite.spi.checkpoint.CheckpointSpi}. If you do that,
  * you need to tell a task which SPI to use (by default it will use the fist
  * SPI in the list).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index d3386d2..f8b2547 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -35,7 +35,7 @@ import org.apache.ignite.spi.deployment.*;
 import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.eventstorage.*;
 import org.apache.ignite.spi.failover.*;
-import org.gridgain.grid.spi.loadbalancing.*;
+import org.apache.ignite.spi.loadbalancing.*;
 import org.gridgain.grid.spi.securesession.*;
 import org.gridgain.grid.spi.securesession.noop.*;
 import org.gridgain.grid.spi.swapspace.*;
@@ -2073,7 +2073,7 @@ public class IgniteConfiguration {
 
     /**
      * Should return fully configured load balancing SPI implementation. If not provided,
-     * {@link org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} will be used.
+     * {@link org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} will be used.
      *
      * @return Grid load balancing SPI implementation or {@code null} to use default implementation.
      */
@@ -2114,9 +2114,9 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Sets fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}.
+     * Sets fully configured instance of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}.
      *
-     * @param loadBalancingSpi Fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} or
+     * @param loadBalancingSpi Fully configured instance of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} or
      *      {@code null} if no SPI provided.
      * @see IgniteConfiguration#getLoadBalancingSpi()
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java
index 6479e94..2edea7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java
+++ b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java
@@ -14,7 +14,7 @@ import java.lang.annotation.*;
 /**
  * Annotates a field or a setter method for injection of {@link org.apache.ignite.compute.ComputeLoadBalancer}.
  * Specific implementation for grid load balancer is defined by
- * {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}
+ * {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}
  * which is provided to grid via {@link org.apache.ignite.configuration.IgniteConfiguration}..
  * <p>
  * Load balancer can be injected into instances of following classes:

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index 03603cb..64c6af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -36,7 +36,7 @@ public interface FailoverContext {
 
     /**
      * Gets the next balanced node for failed job. Internally this method will
-     * delegate to load balancing SPI (see {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} to
+     * delegate to load balancing SPI (see {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} to
      * determine the optimal node for execution.
      *
      * @param top Topology to pick balanced node from.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java
new file mode 100644
index 0000000..e3713c5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java
@@ -0,0 +1,114 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+
+import java.util.*;
+
+/**
+ * Load balancing SPI provides the next best balanced node for job
+ * execution. This SPI is used either implicitly or explicitly whenever
+ * a job gets mapped to a node during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)}
+ * invocation.
+ * <h1 class="header">Coding Examples</h1>
+ * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic
+ * is transparent to your code and is handled automatically by the adapter.
+ * Here is an example of how your task could look:
+ * <pre name="code" class="java">
+ * public class MyFooBarTask extends GridComputeTaskSplitAdapter&lt;Object,Object&gt; {
+ *    &#64;Override
+ *    protected Collection&lt;? extends GridComputeJob&gt; split(int gridSize, Object arg) throws GridException {
+ *        List&lt;MyFooBarJob&gt; jobs = new ArrayList&lt;MyFooBarJob&gt;(gridSize);
+ *
+ *        for (int i = 0; i &lt; gridSize; i++) {
+ *            jobs.add(new MyFooBarJob(arg));
+ *        }
+ *
+ *        // Node assignment via load balancer
+ *        // happens automatically.
+ *        return jobs;
+ *    }
+ *    ...
+ * }
+ * </pre>
+ * If you need more fine-grained control over how some jobs within task get mapped to a node
+ * <i>and</i> use, for example, affinity load balancing for some other jobs within task, then you should use
+ * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task could look. Note that in this
+ * case we manually inject load balancer and use it to pick the best node. Doing it in
+ * such way would allow user to map some jobs manually and for others use load balancer.
+ * <pre name="code" class="java">
+ * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String,String&gt; {
+ *    // Inject load balancer.
+ *    &#64;GridLoadBalancerResource
+ *    GridComputeLoadBalancer balancer;
+ *
+ *    // Map jobs to grid nodes.
+ *    public Map&lt;? extends GridComputeJob, GridNode&gt; map(List&lt;GridNode&gt; subgrid, String arg) throws GridException {
+ *        Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, GridNode&gt;(subgrid.size());
+ *
+ *        // In more complex cases, you can actually do
+ *        // more complicated assignments of jobs to nodes.
+ *        for (int i = 0; i &lt; subgrid.size(); i++) {
+ *            // Pick the next best balanced node for the job.
+ *            GridComputeJob myJob = new MyFooBarJob(arg);
+ *
+ *            jobs.put(myJob, balancer.getBalancedNode(myJob, null));
+ *        }
+ *
+ *        return jobs;
+ *    }
+ *
+ *    // Aggregate results into one compound result.
+ *    public String reduce(List&lt;GridComputeJobResult&gt; results) throws GridException {
+ *        // For the purpose of this example we simply
+ *        // concatenate string representation of every
+ *        // job result
+ *        StringBuilder buf = new StringBuilder();
+ *
+ *        for (GridComputeJobResult res : results) {
+ *            // Append string representation of result
+ *            // returned by every job.
+ *            buf.append(res.getData().toString());
+ *        }
+ *
+ *        return buf.toString();
+ *    }
+ * }
+ * </pre>
+ * <p>
+ * GridGain comes with the following load balancing SPI implementations out of the box:
+ * <ul>
+ * <li>{@link org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} - default</li>
+ * <li>{@link org.apache.ignite.spi.loadbalancing.adaptive.AdaptiveLoadBalancingSpi}</li>
+ * <li>{@link org.apache.ignite.spi.loadbalancing.weightedrandom.WeightedRandomLoadBalancingSpi}</li>
+ * </ul>
+ * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide
+ * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when
+ * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained
+ * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI
+ * methods. Note again that calling methods from this interface on the obtained instance can lead
+ * to undefined behavior and explicitly not supported.
+ */
+public interface LoadBalancingSpi extends IgniteSpi {
+    /**
+     * Gets balanced node for specified job within given task session.
+     *
+     * @param ses Grid task session for currently executing task.
+     * @param top Topology of task nodes from which to pick the best balanced node for given job.
+     * @param job Job for which to pick the best balanced node.
+     * @throws GridException If failed to get next balanced node.
+     * @return Best balanced node for the given job within given task session.
+     */
+    public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) throws GridException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java
new file mode 100644
index 0000000..9db2d84
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java
@@ -0,0 +1,229 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Implementation of node load probing based on CPU load.
+ * <p>
+ * Based on {@link #setUseAverage(boolean)}
+ * parameter, this implementation will either use average CPU load
+ * values or current (default is to use averages).
+ * <p>
+ * Based on {@link #setUseProcessors(boolean)} parameter, this implementation
+ * will either take number of processors on the node into account or not.
+ * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it
+ * usually means that the remaining capacity is proportional to the number of
+ * CPU's (or cores) on the node. This configuration parameter indicates
+ * whether to divide each node's CPU load by the number of processors on that node
+ * (default is {@code true}).
+ * <p>
+ * Also note that in some environments every processor may not be adding 100% of
+ * processing power. For example, if you are using multi-core CPU's, then addition of
+ * every core would probably result in about 75% of extra CPU power. To account
+ * for that, you should set {@link #setProcessorCoefficient(double)} parameter to
+ * {@code 0.75} .
+ * <p>
+ * Below is an example of how CPU load probe would be configured in GridGain
+ * Spring configuration file:
+ * <pre name="code" class="xml">
+ * &lt;property name="loadBalancingSpi"&gt;
+ *     &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
+ *         &lt;property name="loadProbe"&gt;
+ *             &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveCpuLoadProbe"&gt;
+ *                 &lt;property name="useAverage" value="true"/&gt;
+ *                 &lt;property name="useProcessors" value="true"/&gt;
+ *                 &lt;property name="processorCoefficient" value="0.9"/&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *     &lt;/bean&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ * This implementation is used by default by {@link AdaptiveLoadBalancingSpi} SPI.
+ */
+public class AdaptiveCpuLoadProbe implements AdaptiveLoadProbe {
+    /** Flag indicating whether to use average CPU load vs. current. */
+    private boolean useAvg = true;
+
+    /**
+     * Flag indicating whether to divide each node's CPU load
+     * by the number of processors on that node.
+     */
+    private boolean useProcs = true;
+
+    /**
+     * Coefficient of every CPU processor. By default it is {@code 1}, but
+     * in some environments every processor may not be adding 100% of processing
+     * power. For example, if you are using multi-core CPU's, then addition of
+     * every core would probably result in about 75% of extra CPU power, and hence
+     * you would set this coefficient to {@code 0.75} .
+     */
+    private double procCoefficient = 1;
+
+    /**
+     * Initializes CPU load probe to use CPU load average by default.
+     */
+    public AdaptiveCpuLoadProbe() {
+        // No-op.
+    }
+
+    /**
+     * Specifies whether to use average CPU load vs. current and whether or
+     * not to take number of processors into account.
+     * <p>
+     * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it
+     * usually means that the remaining capacity is proportional to the number of
+     * CPU's (or cores) on the node.
+     *
+     * @param useAvg Flag indicating whether to use average CPU load vs. current
+     *      (default is {@code true}).
+     * @param useProcs Flag indicating whether to divide each node's CPU load
+     *      by the number of processors on that node (default is {@code true}).
+     */
+    public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs) {
+        this.useAvg = useAvg;
+        this.useProcs = useProcs;
+    }
+
+    /**
+     * Specifies whether to use average CPU load vs. current and whether or
+     * not to take number of processors into account. It also allows to
+     * specify the coefficient of addition power every CPU adds.
+     * <p>
+     * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it
+     * usually means that the remaining capacity is proportional to the number of
+     * CPU's (or cores) on the node.
+     * <p>
+     * Also, in some environments every processor may not be adding 100% of processing
+     * power. For example, if you are using multi-core CPU's, then addition of
+     * every core would probably result in about 75% of extra CPU power, and hence
+     * you would set this coefficient to {@code 0.75} .
+     *
+     * @param useAvg Flag indicating whether to use average CPU load vs. current
+     *      (default is {@code true}).
+     * @param useProcs Flag indicating whether to divide each node's CPU load
+     *      by the number of processors on that node (default is {@code true}).
+     * @param procCoefficient Coefficient of every CPU processor (default value is {@code 1}).
+     */
+    public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs, double procCoefficient) {
+        this.useAvg = useAvg;
+        this.useProcs = useProcs;
+        this.procCoefficient = procCoefficient;
+    }
+
+    /**
+     * Gets flag indicating whether to use average CPU load vs. current.
+     *
+     * @return Flag indicating whether to use average CPU load vs. current.
+     */
+    public boolean isUseAverage() {
+        return useAvg;
+    }
+
+    /**
+     * Sets flag indicating whether to use average CPU load vs. current.
+     * If not explicitly set, then default value is {@code true}.
+     *
+     * @param useAvg Flag indicating whether to use average CPU load vs. current.
+     */
+    public void setUseAverage(boolean useAvg) {
+        this.useAvg = useAvg;
+    }
+
+    /**
+     * Gets flag indicating whether to use average CPU load vs. current
+     * (default is {@code true}).
+     * <p>
+     * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it
+     * usually means that the remaining capacity is proportional to the number of
+     * CPU's (or cores) on the node.
+     *
+     * @return Flag indicating whether to divide each node's CPU load
+     *      by the number of processors on that node (default is {@code true}).
+     */
+    public boolean isUseProcessors() {
+        return useProcs;
+    }
+
+    /**
+     * Sets flag indicating whether to use average CPU load vs. current
+     * (default is {@code true}).
+     * <p>
+     * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it
+     * usually means that the remaining capacity is proportional to the number of
+     * CPU's (or cores) on the node.
+     * <p>
+     * If not explicitly set, then default value is {@code true}.
+     *
+     * @param useProcs Flag indicating whether to divide each node's CPU load
+     *      by the number of processors on that node (default is {@code true}).
+     */
+    public void setUseProcessors(boolean useProcs) {
+        this.useProcs = useProcs;
+    }
+
+    /**
+     * Gets coefficient of every CPU processor. By default it is {@code 1}, but
+     * in some environments every processor may not be adding 100% of processing
+     * power. For example, if you are using multi-core CPU's, then addition of
+     * every core would probably result in about 75% of extra CPU power, and hence
+     * you would set this coefficient to {@code 0.75} .
+     * <p>
+     * This value is ignored if {@link #isUseProcessors()} is set to {@code false}.
+     *
+     * @return Coefficient of every CPU processor.
+     */
+    public double getProcessorCoefficient() {
+        return procCoefficient;
+    }
+
+    /**
+     * Sets coefficient of every CPU processor. By default it is {@code 1}, but
+     * in some environments every processor may not be adding 100% of processing
+     * power. For example, if you are using multi-core CPU's, then addition of
+     * every core would probably result in about 75% of extra CPU power, and hence
+     * you would set this coefficient to {@code 0.75} .
+     * <p>
+     * This value is ignored if {@link #isUseProcessors()} is set to {@code false}.
+     *
+     * @param procCoefficient Coefficient of every CPU processor.
+     */
+    public void setProcessorCoefficient(double procCoefficient) {
+        A.ensure(procCoefficient > 0, "procCoefficient > 0");
+
+        this.procCoefficient = procCoefficient;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) {
+        ClusterNodeMetrics metrics = node.metrics();
+
+        double k = 1.0d;
+
+        if (useProcs) {
+            int procs = metrics.getTotalCpus();
+
+            if (procs > 1)
+                k = procs * procCoefficient;
+        }
+
+        double load = (useAvg ? metrics.getAverageCpuLoad() : metrics.getCurrentCpuLoad()) / k;
+
+        return load < 0 ? 0 : load;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AdaptiveCpuLoadProbe.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java
new file mode 100644
index 0000000..d5c99b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java
@@ -0,0 +1,96 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Implementation of node load probing based on active and waiting job count.
+ * Based on {@link #setUseAverage(boolean)} parameter, this implementation will
+ * either use average job count values or current (default is to use averages).
+ * <p>
+ * The load of a node is simply calculated by adding active and waiting job counts.
+ * <p>
+ * Below is an example of how CPU load probe would be configured in GridGain
+ * Spring configuration file:
+ * <pre name="code" class="xml">
+ * &lt;property name="loadBalancingSpi"&gt;
+ *     &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
+ *         &lt;property name="loadProbe"&gt;
+ *             &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"&gt;
+ *                 &lt;property name="useAverage" value="true"/&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *     &lt;/bean&gt;
+ * &lt;/property&gt;
+ * </pre>
+ */
+public class AdaptiveJobCountLoadProbe implements AdaptiveLoadProbe {
+    /** Flag indicating whether to use average CPU load vs. current. */
+    private boolean useAvg = true;
+
+    /**
+     * Initializes active job probe.
+     */
+    public AdaptiveJobCountLoadProbe() {
+        // No-op.
+    }
+
+    /**
+     * Creates new active job prove specifying whether to use average
+     * job counts vs. current.
+     *
+     * @param useAvg Flag indicating whether to use average job counts vs. current.
+     */
+    public AdaptiveJobCountLoadProbe(boolean useAvg) {
+        this.useAvg = useAvg;
+    }
+
+    /**
+     * Gets flag indicating whether to use average job counts vs. current.
+     *
+     * @return Flag indicating whether to use average job counts vs. current.
+     */
+    public boolean isUseAverage() {
+        return useAvg;
+    }
+
+    /**
+     * Sets flag indicating whether to use average job counts vs. current.
+     *
+     * @param useAvg Flag indicating whether to use average job counts vs. current.
+     */
+    public void setUseAverage(boolean useAvg) {
+        this.useAvg = useAvg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) {
+        ClusterNodeMetrics metrics = node.metrics();
+
+        if (useAvg) {
+            double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs();
+
+            if (load > 0)
+                return load;
+        }
+
+        double load = metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs();
+
+        return load < 0 ? 0 : load;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AdaptiveJobCountLoadProbe.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
new file mode 100644
index 0000000..5bb4501
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
@@ -0,0 +1,581 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.managers.eventstorage.*;
+import org.apache.ignite.spi.loadbalancing.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Load balancing SPI that adapts to overall node performance. It
+ * proportionally distributes more jobs to more performant nodes based
+ * on a pluggable and dynamic node load probing.
+ * <p>
+ * <h1 class="header">Adaptive Node Probe</h1>
+ * This SPI comes with pluggable algorithm to calculate a node load
+ * at any given point of time. The algorithm is defined by
+ * {@link AdaptiveLoadProbe} interface and user is
+ * free to provide custom implementations. By default
+ * {@link AdaptiveCpuLoadProbe} implementation is used
+ * which distributes jobs to nodes based on average CPU load
+ * on every node.
+ * <p>
+ * The following load probes are available with the product:
+ * <ul>
+ * <li>{@link AdaptiveCpuLoadProbe} - default</li>
+ * <li>{@link AdaptiveProcessingTimeLoadProbe}</li>
+ * <li>{@link AdaptiveJobCountLoadProbe}</li>
+ * </ul>
+ * Note that if {@link AdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0},
+ * then implementation will assume that load value is simply not available and
+ * will try to calculate an average of load values for other nodes. If such
+ * average cannot be obtained (all node load values are {@code 0}), then a value
+ * of {@code 1} will be used.
+ * <p>
+ * When working with node metrics, take into account that all averages are
+ * calculated over metrics history size defined by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()}
+ * and {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} grid configuration parameters.
+ * Generally the larger these configuration parameter values are, the more precise the metrics are.
+ * You should tune these values based on the level of accuracy needed vs. the additional memory
+ * that would be required for storing metrics.
+ * <p>
+ * You should also keep in mind that metrics for remote nodes are delayed (usually by the
+ * heartbeat frequency). So if it is acceptable in your environment, set the heartbeat frequency
+ * to be more inline with job execution time. Generally, the more often heartbeats between nodes
+ * are exchanged, the more precise the metrics are. However, you should keep in mind that if
+ * heartbeats are exchanged too often then it may create unnecessary traffic in the network.
+ * Heartbeats (or metrics update frequency) can be configured via underlying
+ * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid.
+ * <p>
+ * Here is an example of how probing can be implemented to use
+ * number of active and waiting jobs as probing mechanism:
+ * <pre name="code" class="java">
+ * public class FooBarLoadProbe implements GridAdaptiveLoadProbe {
+ *     // Flag indicating whether to use average value or current.
+ *     private int useAvg = true;
+ *
+ *     public FooBarLoadProbe(boolean useAvg) {
+ *         this.useAvg = useAvg;
+ *     }
+ *
+ *     // Calculate load based on number of active and waiting jobs.
+ *     public double getLoad(GridNode node, int jobsSentSinceLastUpdate) {
+ *         GridNodeMetrics metrics = node.getMetrics();
+ *
+ *         if (useAvg) {
+ *             double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs();
+ *
+ *             if (load > 0) {
+ *                 return load;
+ *             }
+ *         }
+ *
+ *         return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs();
+ *     }
+ * }
+ * </pre>
+ * <h1 class="header">Which Node Probe To Use</h1>
+ * There is no correct answer here. Every single node probe will work better or worse in
+ * different environments. CPU load probe (default option) is the safest approach to start
+ * with as it simply attempts to utilize every CPU on the grid to the maximum. However, you should
+ * experiment with other probes by executing load tests in your environment and observing
+ * which probe gives you best performance and load balancing.
+ * <p>
+ * <h1 class="header">Task Coding Example</h1>
+ * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic
+ * is transparent to your code and is handled automatically by the adapter.
+ * Here is an example of how your task will look:
+ * <pre name="code" class="java">
+ * public class MyFooBarTask extends GridComputeTaskSplitAdapter&lt;Object, Object&gt; {
+ *    &#64;Override
+ *    protected Collection&lt;? extends GridComputeJob&gt; split(int gridSize, Object arg) throws GridException {
+ *        List&lt;MyFooBarJob&gt; jobs = new ArrayList&lt;MyFooBarJob&gt;(gridSize);
+ *
+ *        for (int i = 0; i &lt; gridSize; i++) {
+ *            jobs.add(new MyFooBarJob(arg));
+ *        }
+ *
+ *        // Node assignment via load balancer
+ *        // happens automatically.
+ *        return jobs;
+ *    }
+ *    ...
+ * }
+ * </pre>
+ * If you need more fine-grained control over how some jobs within task get mapped to a node
+ * and use affinity load balancing for some other jobs within task, then you should use
+ * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this
+ * case we manually inject load balancer and use it to pick the best node. Doing it in
+ * such way would allow user to map some jobs manually and for others use load balancer.
+ * <pre name="code" class="java">
+ * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, String&gt; {
+ *    // Inject load balancer.
+ *    &#64;GridLoadBalancerResource
+ *    GridComputeLoadBalancer balancer;
+ *
+ *    // Map jobs to grid nodes.
+ *    public Map&lt;? extends GridComputeJob, GridNode&gt; map(List&lt;GridNode&gt; subgrid, String arg) throws GridException {
+ *        Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, GridNode&gt;(subgrid.size());
+ *
+ *        // In more complex cases, you can actually do
+ *        // more complicated assignments of jobs to nodes.
+ *        for (int i = 0; i &lt; subgrid.size(); i++) {
+ *            // Pick the next best balanced node for the job.
+ *            jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode())
+ *        }
+ *
+ *        return jobs;
+ *    }
+ *
+ *    // Aggregate results into one compound result.
+ *    public String reduce(List&lt;GridComputeJobResult&gt; results) throws GridException {
+ *        // For the purpose of this example we simply
+ *        // concatenate string representation of every
+ *        // job result
+ *        StringBuilder buf = new StringBuilder();
+ *
+ *        for (GridComputeJobResult res : results) {
+ *            // Append string representation of result
+ *            // returned by every job.
+ *            buf.append(res.getData().string());
+ *        }
+ *
+ *        return buf.string();
+ *    }
+ * }
+ * </pre>
+ * <p>
+ * <h1 class="header">Configuration</h1>
+ * In order to use this load balancer, you should configure your grid instance
+ * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or
+ * directly. The following configuration parameters are supported:
+ * <h2 class="header">Mandatory</h2>
+ * This SPI has no mandatory configuration parameters.
+ * <h2 class="header">Optional</h2>
+ * This SPI has the following optional configuration parameters:
+ * <ul>
+ * <li>
+ *      Adaptive node load probing implementation (see {@link #setLoadProbe(AdaptiveLoadProbe)}).
+ *      This configuration parameter supplies a custom algorithm for probing a node's load.
+ *      By default, {@link AdaptiveCpuLoadProbe} implementation is used which
+ *      takes every node's CPU load and tries to send proportionally more jobs to less loaded nodes.
+ * </li>
+ * </ul>
+ * <p>
+ * Below is Java configuration example:
+ * <pre name="code" class="java">
+ * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi();
+ *
+ * // Configure probe to use latest job execution time vs. average.
+ * GridAdaptiveProcessingTimeLoadProbe probe = new GridAdaptiveProcessingTimeLoadProbe(false);
+ *
+ * spi.setLoadProbe(probe);
+ *
+ * GridConfiguration cfg = new GridConfiguration();
+ *
+ * // Override default load balancing SPI.
+ * cfg.setLoadBalancingSpi(spi);
+ *
+ * // Starts grid.
+ * G.start(cfg);
+ * </pre>
+ * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring XML configuration:
+ * <pre name="code" class="xml">
+ * &lt;property name="loadBalancingSpi"&gt;
+ *     &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
+ *         &lt;property name="loadProbe"&gt;
+ *             &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"&gt;
+ *                 &lt;constructor-arg value="false"/&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *     &lt;/bean&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ * <img src="http://www.gridgain.com/images/spring-small.png">
+ * <br>
+ * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+public class AdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi,
+    AdaptiveLoadBalancingSpiMBean {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Grid logger. */
+    @IgniteLoggerResource
+    private IgniteLogger log;
+
+    /** */
+    private AdaptiveLoadProbe probe = new AdaptiveCpuLoadProbe();
+
+    /** Local event listener to listen to task completion events. */
+    private GridLocalEventListener evtLsnr;
+
+    /** Task topologies. First pair value indicates whether or not jobs have been mapped. */
+    private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops =
+        new ConcurrentHashMap8<>();
+
+    /** */
+    private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>();
+
+    /** */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    /** {@inheritDoc} */
+    @Override public String getLoadProbeFormatted() {
+        return probe.toString();
+    }
+
+    /**
+     * Sets implementation of node load probe. By default {@link AdaptiveProcessingTimeLoadProbe}
+     * is used which proportionally distributes load based on the average job execution
+     * time on every node.
+     *
+     * @param probe Implementation of node load probe
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setLoadProbe(AdaptiveLoadProbe probe) {
+        A.ensure(probe != null, "probe != null");
+
+        this.probe = probe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        startStopwatch();
+
+        assertParameter(probe != null, "loadProbe != null");
+
+        if (log.isDebugEnabled())
+            log.debug(configInfo("loadProbe", probe));
+
+        registerMBean(gridName, this, AdaptiveLoadBalancingSpiMBean.class);
+
+        // Ack ok start.
+        if (log.isDebugEnabled())
+            log.debug(startInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        rwLock.writeLock().lock();
+
+        try {
+            nodeJobs.clear();
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+
+        unregisterMBean();
+
+        // Ack ok stop.
+        if (log.isDebugEnabled())
+            log.debug(stopInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() {
+            @Override public void onEvent(IgniteEvent evt) {
+                switch (evt.type()) {
+                    case EVT_TASK_FINISHED:
+                    case EVT_TASK_FAILED: {
+                        IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt;
+
+                        taskTops.remove(taskEvt.taskSessionId());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Removed task topology from topology cache for session: " +
+                                taskEvt.taskSessionId());
+
+                        break;
+                    }
+
+                    case EVT_JOB_MAPPED: {
+                        // We should keep topology and use cache in GridComputeTask#map() method to
+                        // avoid O(n*n/2) complexity, after that we can drop caches.
+                        // Here we set mapped property and later cache will be ignored
+                        IgniteJobEvent jobEvt = (IgniteJobEvent)evt;
+
+                        IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(jobEvt.taskSessionId());
+
+                        if (weightedTop != null)
+                            weightedTop.set1(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Job has been mapped. Ignore cache for session: " + jobEvt.taskSessionId());
+
+                        break;
+                    }
+
+                    case EVT_NODE_METRICS_UPDATED:
+                    case EVT_NODE_FAILED:
+                    case EVT_NODE_JOINED:
+                    case EVT_NODE_LEFT: {
+                        IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+                        rwLock.writeLock().lock();
+
+                        try {
+                            switch (evt.type()) {
+                                case EVT_NODE_JOINED: {
+                                    nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0));
+
+                                    break;
+                                }
+
+                                case EVT_NODE_LEFT:
+                                case EVT_NODE_FAILED: {
+                                    nodeJobs.remove(discoEvt.eventNode().id());
+
+                                    break;
+                                }
+
+                                case EVT_NODE_METRICS_UPDATED: {
+                                    // Reset counter.
+                                    nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0));
+
+                                    break;
+                                }
+                            }
+                        }
+                        finally {
+                            rwLock.writeLock().unlock();
+                        }
+                    }
+
+                }
+            }
+        },
+            EVT_NODE_METRICS_UPDATED,
+            EVT_NODE_FAILED,
+            EVT_NODE_JOINED,
+            EVT_NODE_LEFT,
+            EVT_TASK_FINISHED,
+            EVT_TASK_FAILED,
+            EVT_JOB_MAPPED
+        );
+
+        // Put all known nodes.
+        rwLock.writeLock().lock();
+
+        try {
+            for (ClusterNode node : getSpiContext().nodes())
+                nodeJobs.put(node.id(), new AtomicInteger(0));
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextDestroyed0() {
+        if (evtLsnr != null) {
+            IgniteSpiContext ctx = getSpiContext();
+
+            if (ctx != null)
+                ctx.removeLocalEventListener(evtLsnr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job)
+    throws GridException {
+        A.notNull(ses, "ses");
+        A.notNull(top, "top");
+        A.notNull(job, "job");
+
+        IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId());
+
+        // Create new cached topology if there is no one. Do not
+        // use cached topology after task has been mapped.
+        if (weightedTop == null)
+            // Called from GridComputeTask#map(). Put new topology and false as not mapped yet.
+            taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top)));
+        // We have topology - check if task has been mapped.
+        else if (weightedTop.get1())
+            // Do not use cache after GridComputeTask#map().
+            return new WeightedTopology(top).pickWeightedNode();
+
+        return weightedTop.get2().pickWeightedNode();
+    }
+
+    /**
+     * Calculates node load based on set probe.
+     *
+     * @param top List of all nodes.
+     * @param node Node to get load for.
+     * @return Node load.
+     * @throws GridException If returned load is negative.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    private double getLoad(Collection<ClusterNode> top, ClusterNode node) throws GridException {
+        assert !F.isEmpty(top);
+
+        int jobsSentSinceLastUpdate = 0;
+
+        rwLock.readLock().lock();
+
+        try {
+            AtomicInteger cnt = nodeJobs.get(node.id());
+
+            jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get();
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+
+        double load = probe.getLoad(node, jobsSentSinceLastUpdate);
+
+        if (load < 0)
+            throw new GridException("Failed to obtain non-negative load from adaptive load probe: " + load);
+
+        return load;
+    }
+
+    /**
+     * Holder for weighted topology.
+     */
+    private class WeightedTopology {
+        /** Topology sorted by weight. */
+        private final SortedMap<Double, ClusterNode> circle = new TreeMap<>();
+
+        /**
+         * @param top Task topology.
+         * @throws GridException If any load was negative.
+         */
+        WeightedTopology(List<ClusterNode> top) throws GridException {
+            assert !F.isEmpty(top);
+
+            double totalLoad = 0;
+
+            // We need to cache loads here to avoid calls later as load might be
+            // changed between the calls.
+            double[] nums = new double[top.size()];
+
+            int zeroCnt = 0;
+
+            // Compute loads.
+            for (int i = 0; i < top.size(); i++) {
+                double load = getLoad(top, top.get(i));
+
+                nums[i] = load;
+
+                if (load == 0)
+                    zeroCnt++;
+
+                totalLoad += load;
+            }
+
+            // Take care of zero loads.
+            if (zeroCnt > 0) {
+                double newTotal = totalLoad;
+
+                int nonZeroCnt = top.size() - zeroCnt;
+
+                for (int i = 0; i < nums.length; i++) {
+                    double load = nums[i];
+
+                    if (load == 0) {
+                        if (nonZeroCnt > 0)
+                            load = totalLoad / nonZeroCnt;
+
+                        if (load == 0)
+                            load = 1;
+
+                        nums[i] = load;
+
+                        newTotal += load;
+                    }
+                }
+
+                totalLoad = newTotal;
+            }
+
+            double totalWeight = 0;
+
+            // Calculate weights and total weight.
+            for (int i = 0; i < nums.length; i++) {
+                assert nums[i] > 0 : "Invalid load: " + nums[i];
+
+                double weight = totalLoad / nums[i];
+
+                // Convert to weight.
+                nums[i] = weight;
+
+                totalWeight += weight;
+            }
+
+            double weight = 0;
+
+            // Enforce range from 0 to 1.
+            for (int i = 0; i < nums.length; i++) {
+                weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / totalWeight;
+
+                assert weight < 2 : "Invalid weight: " + weight;
+
+                // Complexity of this put is O(logN).
+                circle.put(weight, top.get(i));
+            }
+        }
+
+        /**
+         * Gets weighted node in random fashion.
+         *
+         * @return Weighted node.
+         */
+        ClusterNode pickWeightedNode() {
+            double weight = RAND.nextDouble();
+
+            SortedMap<Double, ClusterNode> pick = circle.tailMap(weight);
+
+            ClusterNode node = pick.get(pick.firstKey());
+
+            rwLock.readLock().lock();
+
+            try {
+                AtomicInteger cnt = nodeJobs.get(node.id());
+
+                if (cnt != null)
+                    cnt.incrementAndGet();
+            }
+            finally {
+                rwLock.readLock().unlock();
+            }
+
+            return node;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AdaptiveLoadBalancingSpi.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java
new file mode 100644
index 0000000..14b4ed8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java
@@ -0,0 +1,27 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Management MBean for {@link AdaptiveLoadBalancingSpi} SPI.
+ */
+@IgniteMBeanDescription("MBean that provides access to adaptive load balancing SPI configuration.")
+public interface AdaptiveLoadBalancingSpiMBean extends IgniteSpiManagementMBean {
+    /**
+     * Gets text description of current load probing implementation used.
+     *
+     * @return Text description of current load probing implementation used.
+     */
+    @IgniteMBeanDescription("Text description of current load probing implementation used.")
+    public String getLoadProbeFormatted();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
new file mode 100644
index 0000000..8027281
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
@@ -0,0 +1,90 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.cluster.*;
+
+/**
+ * Pluggable implementation of node load probing. Implementations
+ * of this can be configured to be used with {@link AdaptiveLoadBalancingSpi}
+ * by setting {@link AdaptiveLoadBalancingSpi#setLoadProbe(AdaptiveLoadProbe)}
+ * configuration parameter.
+ * <p>
+ * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0},
+ * then implementation will assume that load value is simply not available and
+ * will try to calculate an average of load values for other nodes. If such
+ * average cannot be obtained (all node load values are {@code 0}), then a value
+ * of {@code 1} will be used.
+ * <p>
+ * By default, {@link AdaptiveCpuLoadProbe} probing implementation is used.
+ * <p>
+ * <h1 class="header">Example</h1>
+ * Here is an example of how probing can be implemented to use
+ * number of active and waiting jobs as probing mechanism:
+ * <pre name="code" class="java">
+ * public class FooBarLoadProbe implements GridAdaptiveLoadProbe {
+ *     // Flag indicating whether to use average value or current.
+ *     private int useAvg = true;
+ *
+ *     public FooBarLoadProbe(boolean useAvg) {
+ *         this.useAvg = useAvg;
+ *     }
+ *
+ *     // Calculate load based on number of active and waiting jobs.
+ *     public double getLoad(GridNode node, int jobsSentSinceLastUpdate) {
+ *         GridNodeMetrics metrics = node.getMetrics();
+ *
+ *         if (useAvg) {
+ *             double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs();
+ *
+ *             if (load > 0) {
+ *                 return load;
+ *             }
+ *         }
+ *
+ *         return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs();
+ *     }
+ * }
+ * </pre>
+ * Below is an example of how a probe shown above would be configured with {@link AdaptiveLoadBalancingSpi}
+ * SPI:
+ * <pre name="code" class="xml">
+ * &lt;property name="loadBalancingSpi"&gt;
+ *     &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
+ *         &lt;property name="loadProbe"&gt;
+ *             &lt;bean class="foo.bar.FooBarLoadProbe"&gt;
+ *                 &lt;constructor-arg value="true"/&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *     &lt;/bean&gt;
+ * &lt;/property&gt;
+ * </pre>
+ */
+public interface AdaptiveLoadProbe {
+    /**
+     * Calculates load value for a given node. Specific implementations would
+     * usually take into account some of the values provided by
+     * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For example, load can be calculated
+     * based on job execution time or number of active jobs, or CPU/Heap utilization.
+     * <p>
+     * Note that if this method returns a value of {@code 0},
+     * then implementation will assume that load value is simply not available and
+     * will try to calculate an average of load values for other nodes. If such
+     * average cannot be obtained (all node load values are {@code 0}), then a value
+     * of {@code 1} will be used.
+     *
+     * @param node Grid node to calculate load for.
+     * @param jobsSentSinceLastUpdate Number of jobs sent to this node since
+     *      last metrics update. This parameter may be useful when
+     *      implementation takes into account the current job count on a node.
+     * @return Non-negative load value for the node (zero and above).
+     */
+    public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java
new file mode 100644
index 0000000..30474ad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java
@@ -0,0 +1,98 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.adaptive;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Implementation of node load probing based on total job processing time.
+ * Based on {@link #setUseAverage(boolean)}
+ * parameter, this implementation will either use average job execution
+ * time values or current (default is to use averages). The algorithm
+ * returns a sum of job wait time and job execution time.
+ * <p>
+ * Below is an example of how CPU load probe would be configured in GridGain
+ * Spring configuration file:
+ * <pre name="code" class="xml">
+ * &lt;property name="loadBalancingSpi"&gt;
+ *     &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
+ *         &lt;property name="loadProbe"&gt;
+ *             &lt;bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"&gt;
+ *                 &lt;property name="useAverage" value="true"/&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *     &lt;/bean&gt;
+ * &lt;/property&gt;
+ * </pre>
+ */
+public class AdaptiveProcessingTimeLoadProbe implements AdaptiveLoadProbe {
+    /** Flag indicating whether to use average execution time vs. current. */
+    private boolean useAvg = true;
+
+    /**
+     * Initializes execution time load probe to use
+     * execution time average by default.
+     */
+    public AdaptiveProcessingTimeLoadProbe() {
+        // No-op.
+    }
+
+    /**
+     * Specifies whether to use average execution time vs. current.
+     *
+     * @param useAvg Flag indicating whether to use average execution time vs. current.
+     */
+    public AdaptiveProcessingTimeLoadProbe(boolean useAvg) {
+        this.useAvg = useAvg;
+    }
+
+    /**
+     * Gets flag indicating whether to use average execution time vs. current.
+     *
+     * @return Flag indicating whether to use average execution time vs. current.
+     */
+    public boolean isUseAverage() {
+        return useAvg;
+    }
+
+    /**
+     * Sets flag indicating whether to use average execution time vs. current.
+     *
+     * @param useAvg Flag indicating whether to use average execution time vs. current.
+     */
+    public void setUseAverage(boolean useAvg) {
+        this.useAvg = useAvg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) {
+        ClusterNodeMetrics metrics = node.metrics();
+
+        if (useAvg) {
+            double load = metrics.getAverageJobExecuteTime() + metrics.getAverageJobWaitTime();
+
+            // If load is greater than 0, then we can use average times.
+            // Otherwise, we will proceed to using current times.
+            if (load > 0)
+                return load;
+        }
+
+        double load = metrics.getCurrentJobExecuteTime() + metrics.getCurrentJobWaitTime();
+
+        return load < 0 ? 0 : load;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AdaptiveProcessingTimeLoadProbe.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html
new file mode 100644
index 0000000..ee3a5eb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+    @html.file.header
+    _________        _____ __________________        _____
+    __  ____/___________(_)______  /__  ____/______ ____(_)_______
+    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+    / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+-->
+<html>
+<body>
+    <!-- Package description. -->
+    Contains adaptive load balancing SPI.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html
new file mode 100644
index 0000000..fd879b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+    @html.file.header
+    _________        _____ __________________        _____
+    __  ____/___________(_)______  /__  ____/______ ____(_)_______
+    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+    / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+-->
+<html>
+<body>
+    <!-- Package description. -->
+    Contains APIs for load balancing SPI.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
new file mode 100644
index 0000000..e17231a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
@@ -0,0 +1,305 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.loadbalancing.roundrobin;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.managers.eventstorage.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Load balancer that works in global (not-per-task) mode.
+ */
+class RoundRobinGlobalLoadBalancer {
+    /** SPI context. */
+    private IgniteSpiContext ctx;
+
+    /** Listener for node's events. */
+    private GridLocalEventListener lsnr;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Current snapshot of nodes which participated in load balancing. */
+    private volatile GridNodeList nodeList = new GridNodeList(0, null);
+
+    /** Mutex for updating current topology. */
+    private final Object mux = new Object();
+
+    /** Barrier for separating initialization callback and load balancing routine. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /**
+     * @param log Grid logger.
+     */
+    RoundRobinGlobalLoadBalancer(IgniteLogger log) {
+        assert log != null;
+
+        this.log = log;
+    }
+
+    /**
+     * @param ctx Load balancing context.
+     */
+    void onContextInitialized(final IgniteSpiContext ctx) {
+        this.ctx = ctx;
+
+        ctx.addLocalEventListener(
+            lsnr = new GridLocalEventListener() {
+                @Override public void onEvent(IgniteEvent evt) {
+                    assert evt instanceof IgniteDiscoveryEvent;
+
+                    UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+
+                    synchronized (mux) {
+                        if (evt.type() == EVT_NODE_JOINED) {
+                            List<UUID> oldNodes = nodeList.getNodes();
+
+                            if (!oldNodes.contains(nodeId)) {
+                                List<UUID> newNodes = new ArrayList<>(oldNodes.size() + 1);
+
+                                newNodes.add(nodeId);
+
+                                for (UUID node : oldNodes)
+                                    newNodes.add(node);
+
+                                nodeList = new GridNodeList(0, newNodes);
+                            }
+                        }
+                        else {
+                            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+                            List<UUID> oldNodes = nodeList.getNodes();
+
+                            if (oldNodes.contains(nodeId)) {
+                                List<UUID> newNodes = new ArrayList<>(oldNodes.size() - 1);
+
+                                for (UUID node : oldNodes)
+                                    if (!nodeId.equals(node))
+                                        newNodes.add(node);
+
+                                nodeList = new GridNodeList(0, newNodes);
+                            }
+                        }
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT
+        );
+
+        synchronized (mux) {
+            List<UUID> oldNodes = nodeList.getNodes();
+
+            Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : new HashSet<>(oldNodes);
+
+            for (ClusterNode node : ctx.nodes())
+                set.add(node.id());
+
+            nodeList = new GridNodeList(0, new ArrayList<>(set));
+        }
+
+        initLatch.countDown();
+    }
+
+    /** */
+    void onContextDestroyed() {
+        if (ctx != null)
+            ctx.removeLocalEventListener(lsnr);
+    }
+
+    /**
+     * Gets balanced node for given topology.
+     *
+     * @param top Topology to pick from.
+     * @return Best balanced node.
+     * @throws GridException Thrown in case of any error.
+     */
+    ClusterNode getBalancedNode(Collection<ClusterNode> top) throws GridException {
+        assert !F.isEmpty(top);
+
+        awaitInitializationCompleted();
+
+        Map<UUID, ClusterNode> topMap = null;
+
+        ClusterNode found;
+
+        int misses = 0;
+
+        do {
+            GridNodeList nodeList = this.nodeList;
+
+            List<UUID> nodes = nodeList.getNodes();
+
+            int cycleSize = nodes.size();
+
+            if (cycleSize == 0)
+                throw new GridException("Task topology does not have any alive nodes.");
+
+            AtomicInteger idx;
+
+            int curIdx, nextIdx;
+
+            do {
+                idx = nodeList.getCurrentIdx();
+
+                curIdx = idx.get();
+
+                nextIdx = (idx.get() + 1) % cycleSize;
+            }
+            while (!idx.compareAndSet(curIdx, nextIdx));
+
+            found = findNodeById(top, nodes.get(nextIdx));
+
+            if (found == null) {
+                misses++;
+
+                // For optimization purposes checks balancer can return at least one node with specified
+                // request topology only after full cycle (approximately).
+                if (misses >= cycleSize) {
+                    if (topMap == null) {
+                        topMap = U.newHashMap(top.size());
+
+                        for (ClusterNode node : top)
+                            topMap.put(node.id(), node);
+                    }
+
+                    checkBalancerNodes(top, topMap, nodes);
+
+                    // Zero miss counter so next topology check will be performed once again after full cycle.
+                    misses = 0;
+                }
+            }
+        }
+        while (found == null);
+
+        if (log.isDebugEnabled())
+            log.debug("Found round-robin node: " + found);
+
+        return found;
+    }
+
+    /**
+     * Finds node by id. Returns null in case of absence of specified id in request topology.
+     *
+     * @param top Topology for current request.
+     * @param foundNodeId Node id.
+     * @return Found node or null in case of absence of specified id in request topology.
+     */
+    private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID foundNodeId) {
+        for (ClusterNode node : top)
+            if (foundNodeId.equals(node.id()))
+                return node;
+
+        return null;
+    }
+
+    /**
+     * Checks if balancer can return at least one node,
+     * throw exception otherwise.
+     *
+     * @param top Topology for current request.
+     * @param topMap Topology map.
+     * @param nodes Current balanced nodes.
+     * @throws GridException If balancer can not return any node.
+     */
+    private static void checkBalancerNodes(Collection<ClusterNode> top, Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes)
+        throws GridException {
+
+        boolean contains = false;
+
+        for (UUID nodeId : nodes) {
+            if (topMap.get(nodeId) != null) {
+                contains = true;
+
+                break;
+            }
+        }
+
+        if (!contains)
+            throw new GridException("Task topology does not have alive nodes: " + top);
+    }
+
+    /**
+     * Awaits initialization of balancing nodes to be completed.
+     *
+     * @throws GridException Thrown in case of thread interruption.
+     */
+    private void awaitInitializationCompleted() throws GridException {
+        try {
+            if (initLatch.getCount() > 0)
+                initLatch.await();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new GridException("Global balancer was interrupted.", e);
+        }
+    }
+
+    /**
+     * Snapshot of nodes which participated in load balancing.
+     */
+    private static final class GridNodeList {
+        /** Cyclic pointer for selecting next node. */
+        private final AtomicInteger curIdx;
+
+        /** Node ids. */
+        private final List<UUID> nodes;
+
+        /**
+         * @param curIdx Initial index of current node.
+         * @param nodes Initial node ids.
+         */
+        private GridNodeList(int curIdx, List<UUID> nodes) {
+            this.curIdx = new AtomicInteger(curIdx);
+            this.nodes = nodes;
+        }
+
+        /**
+         * @return Index of current node.
+         */
+        private AtomicInteger getCurrentIdx() {
+            return curIdx;
+        }
+
+        /**
+         * @return Node ids.
+         */
+        private List<UUID> getNodes() {
+            return nodes;
+        }
+    }
+
+    /**
+     * THIS METHOD IS USED ONLY FOR TESTING.
+     *
+     * @return Internal list of nodes.
+     */
+    List<UUID> getNodeIds() {
+        List<UUID> nodes = nodeList.getNodes();
+
+        return Collections.unmodifiableList(nodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RoundRobinGlobalLoadBalancer.class, this);
+    }
+}


Mime
View raw message