ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [28/31] ignite git commit: IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.
Date Tue, 19 Jul 2016 14:10:14 GMT
IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1dc8bcc2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1dc8bcc2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1dc8bcc2

Branch: refs/heads/master
Commit: 1dc8bcc20d89eb4172c4b1c8f718419f059c6cc2
Parents: 279de8f
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 19 15:16:21 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 19 16:05:44 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsIgniteMock.java         | 492 +++++++++++
 .../internal/processors/igfs/IgfsMock.java      | 397 +++++++++
 .../mapreduce/IgniteHadoopMapReducePlanner.java |  48 +-
 .../IgniteHadoopWeightedMapReducePlanner.java   | 846 +++++++++++++++++++
 .../internal/processors/hadoop/HadoopUtils.java |  81 ++
 .../planner/HadoopAbstractMapReducePlanner.java | 116 +++
 .../planner/HadoopMapReducePlanGroup.java       | 150 ++++
 .../planner/HadoopMapReducePlanTopology.java    |  89 ++
 .../HadoopDefaultMapReducePlannerSelfTest.java  | 451 +---------
 .../processors/hadoop/HadoopMapReduceTest.java  | 395 ++++++++-
 .../processors/hadoop/HadoopPlannerMockJob.java | 168 ++++
 .../HadoopWeightedMapReducePlannerTest.java     | 599 +++++++++++++
 .../HadoopWeightedPlannerMapReduceTest.java     |  38 +
 .../testsuites/IgniteHadoopTestSuite.java       |   8 +-
 14 files changed, 3385 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
new file mode 100644
index 0000000..0c55595
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteAtomicReference;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteScheduler;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.IgniteServices;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginNotFoundException;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Mocked Ignite implementation for IGFS tests.
+ */
+public class IgfsIgniteMock implements IgniteEx {
+    /** Name. */
+    private final String name;
+
+    /** IGFS. */
+    private final IgniteFileSystem igfs;
+
+    /**
+     * Constructor.
+     *
+     * @param igfs IGFS instance.
+     */
+    public IgfsIgniteMock(@Nullable String name, IgniteFileSystem igfs) {
+        this.name = name;
+        this.igfs = igfs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Collection<IgniteInternalCache<?, ?>> cachesx(
+        @Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean eventUserRecordable(int type) {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allEventsUserRecordable(int[] types) {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isJmxRemoteEnabled() {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRestartEnabled() {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteFileSystem igfsx(@Nullable String name) {
+        return F.eq(name, igfs.name()) ? igfs : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Hadoop hadoop() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteClusterEx cluster() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String latestVersion() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridKernalContext context() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger log() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration configuration() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompute compute() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompute compute(ClusterGroup grp) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteMessaging message() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteMessaging message(ClusterGroup grp) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteEvents events() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteEvents events(ClusterGroup grp) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteServices services() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteServices services(ClusterGroup grp) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService executorService() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService executorService(ClusterGroup grp) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteProductVersion version() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteScheduler scheduler() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> createCache(String cacheName) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> getOrCreateCache(String cacheName) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> void addCacheConfiguration(CacheConfiguration<K, V> cacheCfg) {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg,
+        NearCacheConfiguration<K, V> nearCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg,
+        NearCacheConfiguration<K, V> nearCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName,
+        NearCacheConfiguration<K, V> nearCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> getOrCreateNearCache(@Nullable String cacheName,
+        NearCacheConfiguration<K, V> nearCfg) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void destroyCache(String cacheName) {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteCache<K, V> cache(@Nullable String name) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> cacheNames() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteTransactions transactions() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFileSystem fileSystem(String name) {
+        IgniteFileSystem res = igfsx(name);
+
+        if (res == null)
+            throw new IllegalArgumentException("IGFS is not configured: " + name);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFileSystem> fileSystems() {
+        return Collections.singleton(igfs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteAtomicReference<T> atomicReference(String name, @Nullable T initVal, boolean create)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal,
+        @Nullable S initStamp, boolean create) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCountDownLatch countDownLatch(String name, int cnt, boolean autoDel, boolean create)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteQueue<T> queue(String name, int cap, @Nullable CollectionConfiguration cfg)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteSet<T> set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteBinary binary() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> Affinity<K> affinity(String cacheName) {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
new file mode 100644
index 0000000..dccab4a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMetrics;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
+import org.apache.ignite.igfs.mapreduce.IgfsTask;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Mocked IGFS implementation for IGFS tests.
+ */
+public class IgfsMock implements IgfsEx {
+    /** Name. */
+    private final String name;
+
+    /**
+     * Constructor.
+     *
+     * @param name Name.
+     */
+    public IgfsMock(@Nullable String name) {
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsContext context() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPaths proxyPaths() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Boolean globalSampling() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsLocalMetrics localMetrics() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long groupBlockSize() {
+        throwUnsupported();
+
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String clientLogDirectory() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clientLogDirectory(String logDir) {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evictExclude(IgfsPath path, boolean primary) {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid nextAffinityKey() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isProxy(URI path) {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsSecondaryFileSystem asSecondary() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystemConfiguration configuration() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary summary(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, @Nullable IgniteUuid affKey,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream append(IgfsPath path, boolean create) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsMetrics metrics() throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void format() throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile info(IgfsPath path) throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() throws IgniteException {
+        throwUnsupported();
+
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFileSystem withAsync() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        throwUnsupported();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index 287b5ec..d4a44fa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -26,10 +26,9 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.Ignite;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsPath;
@@ -38,14 +37,11 @@ import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -54,16 +50,7 @@ import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
 /**
  * Default map-reduce planner implementation.
  */
-public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
-    /** Injected grid. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** Logger. */
-    @SuppressWarnings("UnusedDeclaration")
-    @LoggerResource
-    private IgniteLogger log;
-
+public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner {
     /** {@inheritDoc} */
     @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
         @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
@@ -98,7 +85,7 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
         Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
         Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
 
-        Map<String, Collection<UUID>> nodes = hosts(top);
+        Map<String, Collection<UUID>> nodes = groupByHost(top);
 
         Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
 
@@ -129,33 +116,6 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
     }
 
     /**
-     * Groups nodes by host names.
-     *
-     * @param top Topology to group.
-     * @return Map.
-     */
-    private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) {
-        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
-        for (ClusterNode node : top) {
-            for (String host : node.hostNames()) {
-                Collection<UUID> nodeIds = grouped.get(host);
-
-                if (nodeIds == null) {
-                    // Expecting 1-2 nodes per host.
-                    nodeIds = new ArrayList<>(2);
-
-                    grouped.put(host, nodeIds);
-                }
-
-                nodeIds.add(node.id());
-            }
-        }
-
-        return grouped;
-    }
-
-    /**
      * Determine the best node for this split.
      *
      * @param split Split.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
new file mode 100644
index 0000000..27ffc19
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Map-reduce planner which assigns mappers and reducers based on their "weights". Weight describes how much resources
+ * are required to execute particular map or reduce task.
+ * <p>
+ * Plan creation consists of two steps: assigning mappers and assigning reducers.
+ * <p>
+ * Mappers are assigned based on input split data location. For each input split we search for nodes where
+ * its data is stored. Planner tries to assign mappers to their affinity nodes first. This process is governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localMapperWeight}</b> - weight of a map task when it is executed on an affinity node;</li>
+ *     <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is executed on a non-affinity node.</li>
+ * </ul>
+ * Planning algorithm assign mappers so that total resulting weight on all nodes is minimum possible.
+ * <p>
+ * Reducers are assigned differently. First we try to distribute reducers across nodes with mappers. This approach
+ * could minimize expensive data transfer over network. Reducer assigned to a node with mapper is considered
+ * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. This process continue until certain weight
+ * threshold is reached what means that current node is already too busy and it should not have higher priority over
+ * other nodes any more. Threshold can be configured using <b>{@code preferLocalReducerThresholdWeight}</b> property.
+ * <p>
+ * When local reducer threshold is reached on all nodes, we distribute remaining reducers based on their local and
+ * remote weights in the same way as it is done for mappers. This process is governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it is executed on a node with mappers;</li>
+ *     <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it is executed on a node without mappers.</li>
+ * </ul>
+ */
+public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner {
+    /** Default local mapper weight. */
+    public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
+
+    /** Default remote mapper weight. */
+    public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
+
+    /** Default local reducer weight. */
+    public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
+
+    /** Default remote reducer weight. */
+    public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
+
+    /** Default reducer migration threshold weight. */
+    public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200;
+
+    /** Local mapper weight. */
+    private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
+
+    /** Remote mapper weight. */
+    private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT;
+
+    /** Local reducer weight. */
+    private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT;
+
+    /** Remote reducer weight. */
+    private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
+
+    /** Reducer migration threshold weight. */
+    private int preferLocReducerThresholdWeight = DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT;
+
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+        int reducerCnt = job.info().reducers();
+
+        if (reducerCnt < 0)
+            throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt);
+
+        HadoopMapReducePlanTopology top = topology(nodes);
+
+        Mappers mappers = assignMappers(splits, top);
+
+        Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, reducerCnt);
+
+        return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
+    }
+
+    /**
+     * Assign mappers to nodes.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @return Mappers.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Mappers assignMappers(Collection<HadoopInputSplit> splits,
+        HadoopMapReducePlanTopology top) throws IgniteCheckedException {
+        Mappers res = new Mappers();
+
+        for (HadoopInputSplit split : splits) {
+            // Try getting IGFS affinity.
+            Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
+
+            // Get best node.
+            UUID node = bestMapperNode(nodeIds, top);
+
+            assert node != null;
+
+            res.add(split, node);
+        }
+
+        return res;
+    }
+
+    /**
+     * Get affinity nodes for the given input split.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes containing more data
+     * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling.
+     *
+     * @param split Split.
+     * @param top Topology.
+     * @return Affintiy nodes.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top)
+        throws IgniteCheckedException {
+        Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
+
+        if (igfsNodeIds != null)
+            return igfsNodeIds;
+
+        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+        for (String host : split.hosts()) {
+            long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0L;
+
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
+
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++) {
+                    UUID nodeId = grp.nodeId(i);
+
+                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
+                }
+            }
+        }
+
+        return new LinkedHashSet<>(res.values());
+    }
+
+    /**
+     * Get IGFS affinity nodes for split if possible.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes containing more data
+     * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling.
+     *
+     * @param split Input split.
+     * @return IGFS affinity or {@code null} if IGFS is not available.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private Collection<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException {
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+            if (IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+                HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
+
+                IgfsEx igfs = null;
+
+                if (F.eq(ignite.name(), endpoint.grid()))
+                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+                if (igfs != null && !igfs.isProxy(split0.file())) {
+                    IgfsPath path = new IgfsPath(split0.file());
+
+                    if (igfs.exists(path)) {
+                        Collection<IgfsBlockLocation> blocks;
+
+                        try {
+                            blocks = igfs.affinity(path, split0.start(), split0.length());
+                        }
+                        catch (IgniteException e) {
+                            throw new IgniteCheckedException("Failed to get IGFS file block affinity [path=" + path +
+                                ", start=" + split0.start() + ", len=" + split0.length() + ']', e);
+                        }
+
+                        assert blocks != null;
+
+                        if (blocks.size() == 1)
+                            return blocks.iterator().next().nodeIds();
+                        else {
+                            // The most "local" nodes go first.
+                            Map<UUID, Long> idToLen = new HashMap<>();
+
+                            for (IgfsBlockLocation block : blocks) {
+                                for (UUID id : block.nodeIds()) {
+                                    Long len = idToLen.get(id);
+
+                                    idToLen.put(id, len == null ? block.length() : block.length() + len);
+                                }
+                            }
+
+                            // Sort the nodes in non-ascending order by contained data lengths.
+                            Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+                            for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) {
+                                UUID id = idToLenEntry.getKey();
+
+                                res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id);
+                            }
+
+                            return new LinkedHashSet<>(res.values());
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Find best mapper node.
+     *
+     * @param affIds Affinity node IDs.
+     * @param top Topology.
+     * @return Result.
+     */
+    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) {
+        // Priority node.
+        UUID prioAffId = F.first(affIds);
+
+        // Find group with the least weight.
+        HadoopMapReducePlanGroup resGrp = null;
+        MapperPriority resPrio = MapperPriority.NORMAL;
+        int resWeight = Integer.MAX_VALUE;
+
+        for (HadoopMapReducePlanGroup grp : top.groups()) {
+            MapperPriority prio = groupPriority(grp, affIds, prioAffId);
+
+            int weight = grp.weight() + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight);
+
+            if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) {
+                resGrp = grp;
+                resPrio = prio;
+                resWeight = weight;
+            }
+        }
+
+        assert resGrp != null;
+
+        // Update group weight for further runs.
+        resGrp.weight(resWeight);
+
+        // Return the best node from the group.
+        return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
+    }
+
+    /**
+     * Get best node in the group.
+     *
+     * @param grp Group.
+     * @param priority Priority.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity IDs.
+     * @return Best node ID in the group.
+     */
+    private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority,
+        @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
+        // Return the best node from the group.
+        int idx = 0;
+
+        // This is rare situation when several nodes are started on the same host.
+        if (!grp.single()) {
+            switch (priority) {
+                case NORMAL: {
+                    // Pick any node.
+                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
+
+                    break;
+                }
+                case HIGH: {
+                    // Pick any affinity node.
+                    assert affIds != null;
+
+                    List<Integer> cands = new ArrayList<>();
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (affIds.contains(id))
+                            cands.add(i);
+                    }
+
+                    idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+
+                    break;
+                }
+                default: {
+                    // Find primary node.
+                    assert prioAffId != null;
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (F.eq(id, prioAffId)) {
+                            idx = i;
+
+                            break;
+                        }
+                    }
+
+                    assert priority == MapperPriority.HIGHEST;
+                }
+            }
+        }
+
+        return grp.nodeId(idx);
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt);
+
+        int cnt = 0;
+
+        Map<UUID, int[]> res = new HashMap<>(reducers.size());
+
+        for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
+            int[] arr = new int[reducerEntry.getValue()];
+
+            for (int i = 0; i < arr.length; i++)
+                arr[i] = cnt++;
+
+            res.put(reducerEntry.getKey(), arr);
+        }
+
+        assert reducerCnt == cnt : reducerCnt + " != " + cnt;
+
+        return res;
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param top Topology.
+     * @param splits Input splits.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> res = new HashMap<>();
+
+        // Assign reducers to splits.
+        Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt);
+
+        // Assign as much local reducers as possible.
+        int remaining = 0;
+
+        for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) {
+            HadoopInputSplit split = entry.getKey();
+            int cnt = entry.getValue();
+
+            if (cnt > 0) {
+                int assigned = assignLocalReducers(split, cnt, top, mappers, res);
+
+                assert assigned <= cnt;
+
+                remaining += cnt - assigned;
+            }
+        }
+
+        // Assign the rest reducers.
+        if (remaining > 0)
+            assignRemoteReducers(remaining, top, mappers, res);
+
+        return res;
+    }
+
+    /**
+     * Assign local split reducers.
+     *
+     * @param split Split.
+     * @param cnt Reducer count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     * @return Number of locally assigned reducers.
+     */
+    private int assignLocalReducers(HadoopInputSplit split, int cnt, HadoopMapReducePlanTopology top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+        // Dereference node.
+        UUID nodeId = mappers.splitToNode.get(split);
+
+        assert nodeId != null;
+
+        // Dereference group.
+        HadoopMapReducePlanGroup grp = top.groupForId(nodeId);
+
+        assert grp != null;
+
+        // Assign more reducers to the node until threshold is reached.
+        int res = 0;
+
+        while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
+            res++;
+
+            grp.weight(grp.weight() + locReducerWeight);
+        }
+
+        // Update result map.
+        if (res > 0) {
+            Integer reducerCnt = resMap.get(nodeId);
+
+            resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res);
+        }
+
+        return res;
+    }
+
+    /**
+     * Assign remote reducers. Assign to the least loaded first.
+     *
+     * @param cnt Count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     */
+    private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+
+        TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new GroupWeightComparator());
+
+        set.addAll(top.groups());
+
+        while (cnt-- > 0) {
+            // The least loaded machine.
+            HadoopMapReducePlanGroup grp = set.first();
+
+            // Look for nodes with assigned splits.
+            List<UUID> splitNodeIds = null;
+
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID nodeId = grp.nodeId(i);
+
+                if (mappers.nodeToSplits.containsKey(nodeId)) {
+                    if (splitNodeIds == null)
+                        splitNodeIds = new ArrayList<>(2);
+
+                    splitNodeIds.add(nodeId);
+                }
+            }
+
+            // Select best node.
+            UUID id;
+            int newWeight;
+
+            if (splitNodeIds != null) {
+                id = splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
+
+                newWeight = grp.weight() + locReducerWeight;
+            }
+            else {
+                id = grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
+
+                newWeight = grp.weight() + rmtReducerWeight;
+            }
+
+            // Re-add entry with new weight.
+            boolean rmv = set.remove(grp);
+
+            assert rmv;
+
+            grp.weight(newWeight);
+
+            boolean add = set.add(grp);
+
+            assert add;
+
+            // Update result map.
+            Integer res = resMap.get(id);
+
+            resMap.put(id, res == null ? 1 : res + 1);
+        }
+    }
+
+    /**
+     * Comparator based on group's weight.
+     */
+    private static class GroupWeightComparator implements Comparator<HadoopMapReducePlanGroup> {
+        /** {@inheritDoc} */
+        @Override public int compare(HadoopMapReducePlanGroup first, HadoopMapReducePlanGroup second) {
+            int res = first.weight() - second.weight();
+
+            if (res < 0)
+                return -1;
+            else if (res > 0)
+                return 1;
+            else
+                return first.macs().compareTo(second.macs());
+        }
+    }
+
+    /**
+     * Distribute reducers between splits.
+     *
+     * @param splits Splits.
+     * @param reducerCnt Reducer count.
+     * @return Map from input split to reducer count.
+     */
+    private Map<HadoopInputSplit, Integer> assignReducersToSplits(Collection<HadoopInputSplit> splits,
+        int reducerCnt) {
+        Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size());
+
+        int base = reducerCnt / splits.size();
+        int remainder = reducerCnt % splits.size();
+
+        for (HadoopInputSplit split : splits) {
+            int val = base;
+
+            if (remainder > 0) {
+                val++;
+
+                remainder--;
+            }
+
+            res.put(split, val);
+        }
+
+        assert remainder == 0;
+
+        return res;
+    }
+
+    /**
+     * Calculate group priority.
+     *
+     * @param grp Group.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity ID.
+     * @return Group priority.
+     */
+    private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds,
+        @Nullable UUID prioAffId) {
+        assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds);
+        assert grp != null;
+
+        MapperPriority prio = MapperPriority.NORMAL;
+
+        if (!F.isEmpty(affIds)) {
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID id = grp.nodeId(i);
+
+                if (affIds.contains(id)) {
+                    prio = MapperPriority.HIGH;
+
+                    if (F.eq(prioAffId, id)) {
+                        prio = MapperPriority.HIGHEST;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return prio;
+    }
+
+    /**
+     * Get local mapper weight. This weight is added to a node when a mapper is assigned and it's input split data is
+     * located on this node (at least partially).
+     * <p>
+     * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getLocalMapperWeight() {
+        return locMapperWeight;
+    }
+
+    /**
+     * Set local mapper weight. See {@link #getLocalMapperWeight()} for more information.
+     *
+     * @param locMapperWeight Local mapper weight.
+     */
+    public void setLocalMapperWeight(int locMapperWeight) {
+        this.locMapperWeight = locMapperWeight;
+    }
+
+    /**
+     * Get remote mapper weight. This weight is added to a node when a mapper is assigned, but it's input
+     * split data is not located on this node.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getRemoteMapperWeight() {
+        return rmtMapperWeight;
+    }
+
+    /**
+     * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more information.
+     *
+     * @param rmtMapperWeight Remote mapper weight.
+     */
+    public void setRemoteMapperWeight(int rmtMapperWeight) {
+        this.rmtMapperWeight = rmtMapperWeight;
+    }
+
+    /**
+     * Get local reducer weight. This weight is added to a node when a reducer is assigned and the node have at least
+     * one assigned mapper.
+     * <p>
+     * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}.
+     *
+     * @return Local reducer weight.
+     */
+    public int getLocalReducerWeight() {
+        return locReducerWeight;
+    }
+
+    /**
+     * Set local reducer weight. See {@link #getLocalReducerWeight()} for more information.
+     *
+     * @param locReducerWeight Local reducer weight.
+     */
+    public void setLocalReducerWeight(int locReducerWeight) {
+        this.locReducerWeight = locReducerWeight;
+    }
+
+    /**
+     * Get remote reducer weight. This weight is added to a node when a reducer is assigned, but the node doesn't have
+     * any assigned mappers.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}.
+     *
+     * @return Remote reducer weight.
+     */
+    public int getRemoteReducerWeight() {
+        return rmtReducerWeight;
+    }
+
+    /**
+     * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for more information.
+     *
+     * @param rmtReducerWeight Remote reducer weight.
+     */
+    public void setRemoteReducerWeight(int rmtReducerWeight) {
+        this.rmtReducerWeight = rmtReducerWeight;
+    }
+
+    /**
+     * Get reducer migration threshold weight. When threshold is reached, a node with mappers is no longer considered
+     * as preferred for further reducer assignments.
+     * <p>
+     * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}.
+     *
+     * @return Reducer migration threshold weight.
+     */
+    public int getPreferLocalReducerThresholdWeight() {
+        return preferLocReducerThresholdWeight;
+    }
+
+    /**
+     * Set reducer migration threshold weight. See {@link #getPreferLocalReducerThresholdWeight()} for more
+     * information.
+     *
+     * @param reducerMigrationThresholdWeight Reducer migration threshold weight.
+     */
+    public void setPreferLocalReducerThresholdWeight(int reducerMigrationThresholdWeight) {
+        this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this);
+    }
+
+    /**
+     * Node ID and length.
+     */
+    private static class NodeIdAndLength implements Comparable<NodeIdAndLength> {
+        /** Node ID. */
+        private final UUID id;
+
+        /** Length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param len Length.
+         */
+        public NodeIdAndLength(UUID id, long len) {
+            this.id = id;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(NodeIdAndLength obj) {
+            long res = len - obj.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id.compareTo(obj.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof NodeIdAndLength && F.eq(id, ((NodeIdAndLength)obj).id);
+        }
+    }
+
+    /**
+     * Mappers.
+     */
+    private static class Mappers {
+        /** Node-to-splits map. */
+        private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = new HashMap<>();
+
+        /** Split-to-node map. */
+        private final Map<HadoopInputSplit, UUID> splitToNode = new IdentityHashMap<>();
+
+        /**
+         * Add mapping.
+         *
+         * @param split Split.
+         * @param node Node.
+         */
+        public void add(HadoopInputSplit split, UUID node) {
+            Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
+
+            if (nodeSplits == null) {
+                nodeSplits = new HashSet<>();
+
+                nodeToSplits.put(node, nodeSplits);
+            }
+
+            nodeSplits.add(split);
+
+            splitToNode.put(split, node);
+        }
+    }
+
+    /**
+     * Mapper priority enumeration.
+     */
+    private enum MapperPriority {
+        /** Normal node. */
+        NORMAL(0),
+
+        /** (likely) Affinity node. */
+        HIGH(1),
+
+        /** (likely) Affinity node with the highest priority (e.g. because it hosts more data than other nodes). */
+        HIGHEST(2);
+
+        /** Value. */
+        private final int val;
+
+        /**
+         * Constructor.
+         *
+         * @param val Value.
+         */
+        MapperPriority(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 3fa963f..44d871a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -25,8 +25,12 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -352,4 +356,81 @@ public class HadoopUtils {
         }
     }
 
+    /**
+     * Sort input splits by length.
+     *
+     * @param splits Splits.
+     * @return Sorted splits.
+     */
+    public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
+        int id = 0;
+
+        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+        for (HadoopInputSplit split : splits) {
+            long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
+
+            sortedSplits.add(new SplitSortWrapper(id++, split, len));
+        }
+
+        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+        for (SplitSortWrapper sortedSplit : sortedSplits)
+            res.add(sortedSplit.split);
+
+        return res;
+    }
+
+    /**
+     * Split wrapper for sorting.
+     */
+    private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+        /** Unique ID. */
+        private final int id;
+
+        /** Split. */
+        private final HadoopInputSplit split;
+
+        /** Split length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Unique ID.
+         * @param split Split.
+         * @param len Split length.
+         */
+        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+            this.id = id;
+            this.split = split;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(SplitSortWrapper other) {
+            assert other != null;
+
+            long res = len - other.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id - other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
new file mode 100644
index 0000000..f01f72b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+
+/**
+ * Base class for map-reduce planners.
+ */
+public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
+    /** Injected grid. */
+    @IgniteInstanceResource
+    protected Ignite ignite;
+
+    /** Logger. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /**
+     * Create plan topology.
+     *
+     * @param nodes Topology nodes.
+     * @return Plan topology.
+     */
+    protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
+        Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
+
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
+        Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            String macs = node.attribute(ATTR_MACS);
+
+            HadoopMapReducePlanGroup grp = macsMap.get(macs);
+
+            if (grp == null) {
+                grp = new HadoopMapReducePlanGroup(node, macs);
+
+                macsMap.put(macs, grp);
+            }
+            else
+                grp.add(node);
+
+            idToGrp.put(node.id(), grp);
+
+            for (String host : node.addresses()) {
+                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
+
+                if (hostGrp == null)
+                    hostToGrp.put(host, grp);
+                else
+                    assert hostGrp == grp;
+            }
+        }
+
+        return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
+    }
+
+
+    /**
+     * Groups nodes by host names.
+     *
+     * @param top Topology to group.
+     * @return Map.
+     */
+    protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
+        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+        for (ClusterNode node : top) {
+            for (String host : node.hostNames()) {
+                Collection<UUID> nodeIds = grouped.get(host);
+
+                if (nodeIds == null) {
+                    // Expecting 1-2 nodes per host.
+                    nodeIds = new ArrayList<>(2);
+
+                    grouped.put(host, nodeIds);
+                }
+
+                nodeIds.add(node.id());
+            }
+        }
+
+        return grouped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
new file mode 100644
index 0000000..2fe8682
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan group of nodes on a single physical machine.
+ */
+public class HadoopMapReducePlanGroup {
+    /** Node. */
+    private ClusterNode node;
+
+    /** Nodes. */
+    private ArrayList<ClusterNode> nodes;
+
+    /** MAC addresses. */
+    private final String macs;
+
+    /** Weight. */
+    private int weight;
+
+    /**
+     * Constructor.
+     *
+     * @param node First node in the group.
+     * @param macs MAC addresses.
+     */
+    public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
+        assert node != null;
+        assert macs != null;
+
+        this.node = node;
+        this.macs = macs;
+    }
+
+    /**
+     * Add node to the group.
+     *
+     * @param newNode New node.
+     */
+    public void add(ClusterNode newNode) {
+        if (node != null) {
+            nodes = new ArrayList<>(2);
+
+            nodes.add(node);
+
+            node = null;
+        }
+
+        nodes.add(newNode);
+    }
+
+    /**
+     * @return MAC addresses.
+     */
+    public String macs() {
+        return macs;
+    }
+
+    /**
+     * @return {@code True} if only sinle node present.
+     */
+    public boolean single() {
+        return nodeCount() == 1;
+    }
+
+    /**
+     * Get node ID by index.
+     *
+     * @param idx Index.
+     * @return Node.
+     */
+    public UUID nodeId(int idx) {
+        ClusterNode res;
+
+        if (node != null) {
+            assert idx == 0;
+
+            res = node;
+        }
+        else {
+            assert nodes != null;
+            assert idx < nodes.size();
+
+            res = nodes.get(idx);
+        }
+
+        assert res != null;
+
+        return res.id();
+    }
+
+    /**
+     * @return Node count.
+     */
+    public int nodeCount() {
+        return node != null ? 1 : nodes.size();
+    }
+
+    /**
+     * @return weight.
+     */
+    public int weight() {
+        return weight;
+    }
+
+    /**
+     * @param weight weight.
+     */
+    public void weight(int weight) {
+        this.weight = weight;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return macs.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopMapReducePlanGroup.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
new file mode 100644
index 0000000..fa5c469
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan topology.
+ */
+public class HadoopMapReducePlanTopology {
+    /** All groups. */
+    private final List<HadoopMapReducePlanGroup> grps;
+
+    /** Node ID to group map. */
+    private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
+
+    /** Host to group map. */
+    private final Map<String, HadoopMapReducePlanGroup> hostToGrp;
+
+    /**
+     * Constructor.
+     *
+     * @param grps All groups.
+     * @param idToGrp ID to group map.
+     * @param hostToGrp Host to group map.
+     */
+    public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup> hostToGrp) {
+        assert grps != null;
+        assert idToGrp != null;
+        assert hostToGrp != null;
+
+        this.grps = grps;
+        this.idToGrp = idToGrp;
+        this.hostToGrp = hostToGrp;
+    }
+
+    /**
+     * @return All groups.
+     */
+    public List<HadoopMapReducePlanGroup> groups() {
+        return grps;
+    }
+
+    /**
+     * Get group for node ID.
+     *
+     * @param id Node ID.
+     * @return Group.
+     */
+    public HadoopMapReducePlanGroup groupForId(UUID id) {
+        return idToGrp.get(id);
+    }
+
+    /**
+     * Get group for host.
+     *
+     * @param host Host.
+     * @return Group.
+     */
+    @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
+        return hostToGrp.get(host);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopMapReducePlanTopology.class, this);
+    }
+}


Mime
View raw message