ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [27/31] ignite git commit: IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.
Date Tue, 19 Jul 2016 14:10:13 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index ffa6f7d..a69b72a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -17,47 +17,27 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.IgniteSpringBean;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
 import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsMetrics;
-import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
-import org.apache.ignite.igfs.mapreduce.IgfsTask;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.cluster.IgniteClusterEx;
-import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsContext;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter;
-import org.apache.ignite.internal.processors.igfs.IgfsLocalMetrics;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  *
@@ -90,12 +70,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
     /** */
     private static final String INVALID_HOST_3 = "invalid_host3";
 
-    /** Mocked Grid. */
-    private static final MockIgnite GRID = new MockIgnite();
-
     /** Mocked IGFS. */
     private static final IgniteFileSystem IGFS = new MockIgfs();
 
+    /** Mocked Grid. */
+    private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS);
+
     /** Planner. */
     private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner();
 
@@ -109,15 +89,15 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
     private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>();
 
     /**
-     *
+     * Static initializer.
      */
     static {
-        GridTestUtils.setFieldValue(PLANNER, "ignite", GRID);
+        GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID);
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        GridTestUtils.setFieldValue(PLANNER, "log", log());
+        GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log());
 
         BLOCK_MAP.clear();
         PROXY_MAP.clear();
@@ -445,7 +425,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         top.add(node2);
         top.add(node3);
 
-        HadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null);
+        HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null);
 
         PLAN.set(plan);
 
@@ -607,81 +587,17 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
     }
 
     /**
-     * Mocked job.
+     * Mocked IGFS.
      */
-    private static class MockJob implements HadoopJob {
-        /** Reducers count. */
-        private final int reducers;
-
-        /** */
-        private Collection<HadoopInputSplit> splitList;
-
+    private static class MockIgfs extends IgfsMock {
         /**
          * Constructor.
-         *
-         * @param reducers Reducers count.
-         * @param splitList Splits.
          */
-        private MockJob(int reducers, Collection<HadoopInputSplit> splitList) {
-            this.reducers = reducers;
-            this.splitList = splitList;
-        }
-
-        /** {@inheritDoc} */
-        @Override public HadoopJobId id() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public HadoopJobInfo info() {
-            return new HadoopDefaultJobInfo() {
-                @Override public int reducers() {
-                    return reducers;
-                }
-            };
+        public MockIgfs() {
+            super("igfs");
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
-            return splitList;
-        }
-
-        /** {@inheritDoc} */
-        @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void dispose(boolean external) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cleanupStagingDirectory() {
-            // No-op.
-        }
-    }
-
-    /**
-     * Mocked IGFS.
-     */
-    private static class MockIgfs implements IgfsEx {
-        /** {@inheritDoc} */
         @Override public boolean isProxy(URI path) {
             return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
         }
@@ -692,331 +608,8 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
-            long maxLen) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void stop(boolean cancel) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsContext context() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsPaths proxyPaths() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsInputStreamAdapter open(IgfsPath path) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Boolean globalSampling() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsLocalMetrics localMetrics() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long groupBlockSize() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String clientLogDirectory() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void clientLogDirectory(String logDir) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean evictExclude(IgfsPath path, boolean primary) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String name() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public FileSystemConfiguration configuration() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean exists(IgfsPath path) {
             return true;
         }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public IgfsFile info(IgfsPath path) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsPathSummary summary(IgfsPath path) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void rename(IgfsPath src, IgfsPath dest) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean delete(IgfsPath path, boolean recursive) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void mkdirs(IgfsPath path) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long usedSpaceSize() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
-            long blockSize, @Nullable Map<String, String> props) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite,
-            @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsOutputStream append(IgfsPath path, boolean create) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create,
-            @Nullable Map<String, String> props) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsMetrics metrics() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void resetMetrics() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public long size(IgfsPath path) {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void format() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
-            Collection<IgfsPath> paths, @Nullable T arg) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
-            Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
-            @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
-            @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
-            long maxRangeLen, @Nullable T arg) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid nextAffinityKey() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFileSystem withAsync() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isAsync() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <R> IgniteFuture<R> future() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsSecondaryFileSystem asSecondary() {
-            return null;
-        }
-    }
-
-    /**
-     * Mocked Grid.
-     */
-    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
-    private static class MockIgnite extends IgniteSpringBean implements IgniteEx {
-        /** {@inheritDoc} */
-        @Override public IgniteClusterEx cluster() {
-            return (IgniteClusterEx)super.cluster();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFileSystem igfsx(String name) {
-            assert F.eq("igfs", name);
-
-            return IGFS;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Hadoop hadoop() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public Collection<IgniteInternalCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean eventUserRecordable(int type) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean allEventsUserRecordable(int[] types) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isJmxRemoteEnabled() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isRestartEnabled() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClusterNode localNode() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String latestVersion() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridKernalContext context() {
-            return null;
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index b703896..5d1de38 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -17,13 +17,130 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
 import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
-public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
+public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    /** The user to run Hadoop job on behalf of. */
+    protected static final String USER = "vasya";
+
+    /** Secondary IGFS name. */
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** The secondary Ignite node. */
+    protected Ignite igniteSecondary;
+
+    /** The secondary Fs. */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * Gets owner of a IgfsEx path.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsUtils.PROP_USER_NAME);
+    }
+
+    /**
+     * Gets owner of a secondary Fs path.
+     * @param secFs The sec Fs.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     * Checks owner of the path.
+     * @param p The path.
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
     /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
@@ -35,32 +152,274 @@ public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
 
         IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
 
+        final int red = 10_000;
+        final int blue = 20_000;
+        final int green = 15_000;
+        final int yellow = 7_000;
+
         generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
 
-        for (boolean[] apiMode: getApiModes()) {
-            assert apiMode.length == 3;
+        for (int i = 0; i < 3; i++) {
+            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+            boolean useNewMapper = (i & 1) == 0;
+            boolean useNewCombiner = (i & 2) == 0;
+            boolean useNewReducer = (i & 4) == 0;
+
+            JobConf jobConf = new JobConf();
 
-            boolean useNewMapper = apiMode[0];
-            boolean useNewCombiner = apiMode[1];
-            boolean useNewReducer = apiMode[2];
+            jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
+            jobConf.setUser(USER);
+            jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
 
-            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+            //To split into about 40 items for v2
+            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+            //For v1
+            jobConf.setInt("fs.local.block.size", 65000);
+
+            // File system coordinates.
+            setupFileSystems(jobConf);
+
+            HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
+
+            Job job = Job.getInstance(jobConf);
+
+            HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+
+            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+            job.setJarByClass(HadoopWordCount2.class);
+
+            HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            fut.get();
+
+            checkJobStatistics(jobId);
+
+            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+            checkOwner(new IgfsPath(outFile));
+
+            String actual = readAndSortFile(outFile, job.getConfiguration());
+
+            assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
+                useNewReducer,
+                "blue\t" + blue + "\n" +
+                "green\t" + green + "\n" +
+                "red\t" + red + "\n" +
+                "yellow\t" + yellow + "\n",
+                actual
+            );
         }
     }
 
     /**
-     * Gets API mode combinations to be tested.
-     * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
+     * Gets if to compress output data with Snappy.
+     *
+     * @return If to compress output data with Snappy.
+     */
+    protected boolean compressOutputSnappy() {
+        return false;
+    }
+
+    /**
+     * Simple test job statistics.
      *
-     * @return Arrays of booleans indicating API combinations to test.
+     * @param jobId Job id.
+     * @throws IgniteCheckedException
      */
-    protected boolean[][] getApiModes() {
-        return new boolean[][] {
-            { false, false, false },
-            { false, false, true },
-            { false, true,  false },
-            { true,  false, false },
-            { true,  true,  true },
-        };
+    private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException {
+        HadoopCounters cntrs = grid(0).hadoop().counters(jobId);
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
+
+        Map<String, Integer> phaseOrders = new HashMap<>();
+        phaseOrders.put("submit", 0);
+        phaseOrders.put("prepare", 1);
+        phaseOrders.put("start", 2);
+        phaseOrders.put("Cstart", 3);
+        phaseOrders.put("finish", 4);
+
+        String prevTaskId = null;
+
+        long apiEvtCnt = 0;
+
+        for (T2<String, Long> evt : perfCntr.evts()) {
+            //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
+            String[] parsedEvt = evt.get1().split(" ");
+
+            String taskId;
+            String taskPhase;
+
+            if ("JOB".equals(parsedEvt[0])) {
+                taskId = parsedEvt[0];
+                taskPhase = parsedEvt[1];
+            }
+            else {
+                taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1];
+                taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
+            }
+
+            if (!taskId.equals(prevTaskId))
+                tasks.put(taskId, new TreeMap<Integer,Long>());
+
+            Integer pos = phaseOrders.get(taskPhase);
+
+            assertNotNull("Invalid phase " + taskPhase, pos);
+
+            tasks.get(taskId).put(pos, evt.get2());
+
+            prevTaskId = taskId;
+
+            apiEvtCnt++;
+        }
+
+        for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) {
+            Map<Integer, Long> order = task.getValue();
+
+            long prev = 0;
+
+            for (Map.Entry<Integer, Long> phase : order.entrySet()) {
+                assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev);
+
+                prev = phase.getValue();
+            }
+        }
+
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return igfs.exists(statPath);
+            }
+        }, 20_000);
+
+        final long apiEvtCnt0 = apiEvtCnt;
+
+        boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) {
+                        return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader);
+                    }
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, 10000);
+
+        if (!res) {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
+
+            assert false : "Invalid API events count [exp=" + apiEvtCnt0 +
+                ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+        super.beforeTest();
+    }
+
+    /**
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
+     */
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        HadoopConfiguration hadoopCfg = createHadoopConfiguration();
+
+        if (hadoopCfg != null)
+            cfg.setHadoopConfiguration(hadoopCfg);
+
+        return G.start(cfg);
+    }
+
+    /**
+     * Creates custom Hadoop configuration.
+     *
+     * @return The Hadoop configuration.
+     */
+    protected HadoopConfiguration createHadoopConfiguration() {
+        return null;
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
+
+        return fsCfg;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
new file mode 100644
index 0000000..88d0f80
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * Mock job for planner tests.
+ */
+public class HadoopPlannerMockJob implements HadoopJob {
+    /** Input splits. */
+    private final Collection<HadoopInputSplit> splits;
+
+    /** Reducers count. */
+    private final int reducers;
+
+    /**
+     * Constructor.
+     *
+     * @param splits Input splits.
+     * @param reducers Reducers.
+     */
+    public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) {
+        this.splits = splits;
+        this.reducers = reducers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+        return splits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobInfo info() {
+        return new JobInfo(reducers);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId id() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dispose(boolean external) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupStagingDirectory() {
+        throwUnsupported();
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+
+    /**
+     * Mocked job info.
+     */
+    private static class JobInfo implements HadoopJobInfo {
+        /** Reducers. */
+        private final int reducers;
+
+        /**
+         * Constructor.
+         *
+         * @param reducers Reducers.
+         */
+        public JobInfo(int reducers) {
+            this.reducers = reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int reducers() {
+            return reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String property(String name) {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasCombiner() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasReducer() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+            @Nullable String[] libNames) throws IgniteCheckedException {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String jobName() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            throwUnsupported();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
new file mode 100644
index 0000000..4e7cc50
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Tests for weighted map-reduce planned.
+ */
+public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest {
+    /** ID 1. */
+    private static final UUID ID_1 = new UUID(0, 1);
+
+    /** ID 2. */
+    private static final UUID ID_2 = new UUID(0, 2);
+
+    /** ID 3. */
+    private static final UUID ID_3 = new UUID(0, 3);
+
+    /** MAC 1. */
+    private static final String MAC_1 = "mac1";
+
+    /** MAC 2. */
+    private static final String MAC_2 = "mac2";
+
+    /** MAC 3. */
+    private static final String MAC_3 = "mac3";
+
+    /** Host 1. */
+    private static final String HOST_1 = "host1";
+
+    /** Host 2. */
+    private static final String HOST_2 = "host2";
+
+    /** Host 3. */
+    private static final String HOST_3 = "host3";
+
+    /** Host 4. */
+    private static final String HOST_4 = "host4";
+
+    /** Host 5. */
+    private static final String HOST_5 = "host5";
+
+    /** Standard node 1. */
+    private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
+
+    /** Standard node 2. */
+    private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2);
+
+    /** Standard node 3. */
+    private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3);
+
+    /** Standard nodes. */
+    private static final Collection<ClusterNode> NODES;
+
+    /**
+     * Static initializer.
+     */
+    static {
+        NODES = new ArrayList<>();
+
+        NODES.add(NODE_1);
+        NODES.add(NODE_2);
+        NODES.add(NODE_3);
+    }
+
+    /**
+     * Test one IGFS split being assigned to affinity node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testOneIgfsSplitAffinity() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+        List<HadoopInputSplit> splits = new ArrayList<>();
+
+        splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50));
+
+        final int expReducers = 4;
+
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+        HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+        assert plan.mappers() == 1;
+        assert plan.mapperNodeIds().size() == 1;
+        assert plan.mapperNodeIds().contains(ID_1);
+
+        checkPlanMappers(plan, splits, NODES, false/*only 1 split*/);
+        checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/);
+    }
+
+    /**
+     * Test one HDFS splits.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHdfsSplitsAffinity() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+        final List<HadoopInputSplit> splits = new ArrayList<>();
+
+        splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+        splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+        splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+
+        // The following splits belong to hosts that are out of Ignite topology at all.
+        // This means that these splits should be assigned to any least loaded modes:
+        splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+        splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+        final int expReducers = 7;
+
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+        final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+        checkPlanMappers(plan, splits, NODES, true);
+
+        checkPlanReducers(plan, NODES, expReducers, true);
+    }
+
+    /**
+     * Test HDFS splits with Replication == 3.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHdfsSplitsReplication() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+        final List<HadoopInputSplit> splits = new ArrayList<>();
+
+        splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+        splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+        splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+        // The following splits belong to hosts that are out of Ignite topology at all.
+        // This means that these splits should be assigned to any least loaded modes:
+        splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+        splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+        final int expReducers = 8;
+
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+        final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+        checkPlanMappers(plan, splits, NODES, true);
+
+        checkPlanReducers(plan, NODES, expReducers, true);
+    }
+
+    /**
+     * Get all IDs.
+     *
+     * @param nodes Nodes.
+     * @return IDs.
+     */
+    private static Set<UUID> allIds(Collection<ClusterNode> nodes) {
+        Set<UUID> allIds = new HashSet<>();
+
+        for (ClusterNode n : nodes)
+            allIds.add(n.id());
+
+        return allIds;
+    }
+
+    /**
+     * Check mappers for the plan.
+     *
+     * @param plan Plan.
+     * @param splits Splits.
+     * @param nodes Nodes.
+     * @param expectUniformity WHether uniformity is expected.
+     */
+    private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits,
+        Collection<ClusterNode> nodes, boolean expectUniformity) {
+        // Number of mappers should correspomd to the number of input splits:
+        assertEquals(splits.size(), plan.mappers());
+
+        if (expectUniformity) {
+            // mappers are assigned to all available nodes:
+            assertEquals(nodes.size(), plan.mapperNodeIds().size());
+
+
+            assertEquals(allIds(nodes), plan.mapperNodeIds());
+        }
+
+        // Check all splits are covered by mappers:
+        Set<HadoopInputSplit> set = new HashSet<>();
+
+        for (UUID id: plan.mapperNodeIds()) {
+            Collection<HadoopInputSplit> sp = plan.mappers(id);
+
+            assert sp != null;
+
+            for (HadoopInputSplit s: sp)
+                assertTrue(set.add(s));
+        }
+
+        // must be of the same size & contain same elements:
+        assertEquals(set, new HashSet<>(splits));
+    }
+
+    /**
+     * Check plan reducers.
+     *
+     * @param plan Plan.
+     * @param nodes Nodes.
+     * @param expReducers Expected reducers.
+     * @param expectUniformity Expected uniformity.
+     */
+    private static void checkPlanReducers(HadoopMapReducePlan plan,
+        Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) {
+
+        assertEquals(expReducers, plan.reducers());
+
+        if (expectUniformity)
+            assertEquals(allIds(nodes), plan.reducerNodeIds());
+
+        int sum = 0;
+        int lenSum = 0;
+
+        for (UUID uuid: plan.reducerNodeIds()) {
+            int[] rr = plan.reducers(uuid);
+
+            assert rr != null;
+
+            lenSum += rr.length;
+
+            for (int i: rr)
+                sum += i;
+        }
+
+        assertEquals(expReducers, lenSum);
+
+        // Numbers in the arrays must be consequtive integers stating from 0,
+        // check that simply calculating their total sum:
+        assertEquals((lenSum * (lenSum - 1) / 2), sum);
+    }
+
+    /**
+     * Create planner for IGFS.
+     *
+     * @param igfs IGFS.
+     * @return Planner.
+     */
+    private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) {
+        IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+        IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs);
+
+        GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite);
+
+        return planner;
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+
+    /**
+     * Mocked node.
+     */
+    private static class MockNode implements ClusterNode {
+        /** ID. */
+        private final UUID id;
+
+        /** MAC addresses. */
+        private final String macs;
+
+        /** Addresses. */
+        private final List<String> addrs;
+
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param macs MAC addresses.
+         * @param addrs Addresses.
+         */
+        public MockNode(UUID id, String macs, String... addrs) {
+            assert addrs != null;
+
+            this.id = id;
+            this.macs = macs;
+
+            this.addrs = Arrays.asList(addrs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public UUID id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Nullable @Override public <T> T attribute(String name) {
+            if (F.eq(name, IgniteNodeAttributes.ATTR_MACS))
+                return (T)macs;
+
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> addresses() {
+            return addrs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object consistentId() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterMetrics metrics() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Object> attributes() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> hostNames() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long order() {
+            throwUnsupported();
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteProductVersion version() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isLocal() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDaemon() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isClient() {
+            throwUnsupported();
+
+            return false;
+        }
+    }
+
+    /**
+     * Locations builder.
+     */
+    private static class LocationsBuilder {
+        /** Locations. */
+        private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>();
+
+        /**
+         * Create new locations builder.
+         *
+         * @return Locations builder.
+         */
+        public static LocationsBuilder create() {
+            return new LocationsBuilder();
+        }
+
+        /**
+         * Add locations.
+         *
+         * @param start Start.
+         * @param nodes Nodes.
+         * @return This builder for chaining.
+         */
+        public LocationsBuilder add(long start, MockNode... nodes) {
+            locs.put(start, Arrays.asList(nodes));
+
+            return this;
+        }
+
+        /**
+         * Build locations.
+         *
+         * @return Locations.
+         */
+        public TreeMap<Long, Collection<MockNode>> build() {
+            return locs;
+        }
+
+        /**
+         * Build IGFS.
+         *
+         * @return IGFS.
+         */
+        public MockIgfs buildIgfs() {
+            return new MockIgfs(build());
+        }
+    }
+
+    /**
+     * Mocked IGFS.
+     */
+    private static class MockIgfs extends IgfsMock {
+        /** Block locations. */
+        private final TreeMap<Long, Collection<MockNode>> locs;
+
+        /**
+         * Constructor.
+         *
+         * @param locs Block locations.
+         */
+        public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) {
+            super("igfs");
+
+            this.locs = locs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
+            Collection<IgfsBlockLocation> res = new ArrayList<>();
+
+            long cur = start;
+            long remaining = len;
+
+            long prevLocStart = -1;
+            Collection<MockNode> prevLocNodes = null;
+
+            for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) {
+                long locStart = locEntry.getKey();
+                Collection<MockNode> locNodes = locEntry.getValue();
+
+                if (prevLocNodes != null) {
+                    if (cur < locStart) {
+                        // Add part from previous block.
+                        long prevLen = locStart - prevLocStart;
+
+                        res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes));
+
+                        cur = locStart;
+                        remaining -= prevLen;
+                    }
+                }
+
+                prevLocStart = locStart;
+                prevLocNodes = locNodes;
+
+                if (remaining == 0)
+                    break;
+            }
+
+            // Add remainder.
+            if (remaining != 0)
+                res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean exists(IgfsPath path) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isProxy(URI path) {
+            return false;
+        }
+    }
+
+    /**
+     * Mocked block location.
+     */
+    private static class IgfsBlockLocationMock implements IgfsBlockLocation {
+        /** Start. */
+        private final long start;
+
+        /** Length. */
+        private final long len;
+
+        /** Node IDs. */
+        private final List<UUID> nodeIds;
+
+        /**
+         * Constructor.
+         *
+         * @param start Start.
+         * @param len Length.
+         * @param nodes Nodes.
+         */
+        public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) {
+            this.start = start;
+            this.len = len;
+
+            this.nodeIds = new ArrayList<>(nodes.size());
+
+            for (MockNode node : nodes)
+                nodeIds.add(node.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public long start() {
+            return start;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long length() {
+            return len;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<UUID> nodeIds() {
+            return nodeIds;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> names() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> hosts() {
+            throwUnsupported();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java
new file mode 100644
index 0000000..e0403c2
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+
+/**
+ * Tests whole map-red execution Weighted planner.
+ */
+public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected HadoopConfiguration createHadoopConfiguration() {
+        HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+
+        // Use weighted planner with default settings:
+        IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+        hadoopCfg.setMapReducePlanner(planner);
+
+        return hadoopCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 7d877ab..6900425 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -28,7 +28,6 @@ import org.apache.ignite.hadoop.cache.HadoopTxConfigCacheTest;
 import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest;
 import org.apache.ignite.hadoop.util.BasicUserNameMapperSelfTest;
 import org.apache.ignite.hadoop.util.ChainedUserNameMapperSelfTest;
-import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
 import org.apache.ignite.hadoop.util.KerberosUserNameMapperSelfTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
@@ -72,6 +71,8 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test;
 import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test;
 import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopWeightedMapReducePlannerTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopWeightedPlannerMapReduceTest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest;
@@ -110,6 +111,9 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedMapReducePlannerTest.class.getName())));
+
         suite.addTest(new TestSuite(ldr.loadClass(BasicUserNameMapperSelfTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(KerberosUserNameMapperSelfTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(ChainedUserNameMapperSelfTest.class.getName())));
@@ -156,7 +160,6 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopValidationSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopJobTrackerSelfTest.class.getName())));
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName())));
@@ -176,6 +179,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
         suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName())));
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedPlannerMapReduceTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopNoHadoopMapReduceTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceErrorResilienceTest.class.getName())));
 


Mime
View raw message