ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [03/13] incubator-ignite git commit: # IGNITE-386: Moving core classes (6).
Date Tue, 03 Mar 2015 14:14:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
new file mode 100644
index 0000000..1390982
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Abstract class for tests based on WordCount test job.
+ */
+public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest {
+    /** Input path. */
+    protected static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    protected static final String PATH_OUTPUT = "/output";
+
+    /** IGFS instance. */
+    protected IgfsEx igfs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        // Init cache by correct LocalFileSystem implementation
+        FileSystem.getLocal(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /**
+     * Generates test file.
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    protected void generateTestFile(String path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Input file preparing
+        PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true));
+
+        int j = 0;
+
+        while (j < wordsArr.size()) {
+            int i = 5 + (int)(Math.random() * 5);
+
+            List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+            j += i;
+
+            testInputFileWriter.println(Joiner.on(' ').join(subList));
+        }
+
+        testInputFileWriter.close();
+    }
+
+    /**
+     * Reads whole text file into String.
+     *
+     * @param fileName Name of the file to read.
+     * @return Content of the file as String value.
+     * @throws Exception If could not read the file.
+     */
+    protected String readAndSortFile(String fileName) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
+
+        List<String> list = new ArrayList<>();
+
+        String line;
+
+        while ((line = reader.readLine()) != null)
+            list.add(line);
+
+        Collections.sort(list);
+
+        return Joiner.on('\n').join(list) + "\n";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
new file mode 100644
index 0000000..733ed01
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.*;
+
+/**
+ * Test of integration with Hadoop client via command line interface.
+ */
+public class HadoopCommandLineTest extends GridCommonAbstractTest {
+    /** IGFS instance. */
+    private IgfsEx igfs;
+
+    /** */
+    private static final String igfsName = "igfs";
+
+    /** */
+    private static File testWorkDir;
+
+    /** */
+    private static String hadoopHome;
+
+    /** */
+    private static String hiveHome;
+
+    /** */
+    private static File examplesJar;
+
+    /**
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    private void generateTestFile(File path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Writing file
+        try (PrintWriter writer = new PrintWriter(path)) {
+            int j = 0;
+
+            while (j < wordsArr.size()) {
+                int i = 5 + (int)(Math.random() * 5);
+
+                List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+                j += i;
+
+                writer.println(Joiner.on(' ').join(subList));
+            }
+
+            writer.flush();
+        }
+    }
+
+    /**
+     * Generates two data files to join its with Hive.
+     *
+     * @throws FileNotFoundException If failed.
+     */
+    private void generateHiveTestFiles() throws FileNotFoundException {
+        try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
+             PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
+            char sep = '\t';
+
+            int idB = 0;
+            int idA = 0;
+            int v = 1000;
+
+            for (int i = 0; i < 1000; i++) {
+                writerA.print(idA++);
+                writerA.print(sep);
+                writerA.println(idB);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+            }
+
+            writerA.flush();
+            writerB.flush();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
+
+        assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
+
+        hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
+
+        assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
+
+        String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
+
+        File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
+            @Override public boolean accept(File pathname) {
+                return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
+                    pathname.getName().endsWith(".jar");
+            }
+        });
+
+        assertEquals("Invalid hadoop distribution.", 1, fileList.length);
+
+        examplesJar = fileList[0];
+
+        testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
+
+        U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
+
+        File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml");
+        File dstFile = new File(testWorkDir, "mapred-site.xml");
+
+        try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
+             PrintWriter out = new PrintWriter(dstFile)) {
+            String line;
+
+            while ((line = in.readLine()) != null) {
+                if (line.startsWith("</configuration>"))
+                    out.println(
+                        "    <property>\n" +
+                        "        <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
+                        "        <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" +
+                        "    </property>\n");
+
+                out.println(line);
+            }
+
+            out.flush();
+        }
+
+        generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
+
+        generateHiveTestFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        U.delete(testWorkDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * Creates the process build with appropriate environment to run Hadoop CLI.
+     *
+     * @return Process builder.
+     */
+    private ProcessBuilder createProcessBuilder() {
+        String sep = ":";
+
+        String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        ProcessBuilder res = new ProcessBuilder();
+
+        res.environment().put("HADOOP_HOME", hadoopHome);
+        res.environment().put("HADOOP_CLASSPATH", ggClsPath);
+        res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
+
+        res.redirectErrorStream(true);
+
+        return res;
+    }
+
+    /**
+     * Waits for process exit and prints the its output.
+     *
+     * @param proc Process.
+     * @return Exit code.
+     * @throws Exception If failed.
+     */
+    private int watchProcess(Process proc) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+        String line;
+
+        while ((line = reader.readLine()) != null)
+            log().info(line);
+
+        return proc.waitFor();
+    }
+
+    /**
+     * Executes Hadoop command line tool.
+     *
+     * @param args Arguments for Hadoop command line tool.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHadoopCmd(String... args) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        cmd.add(hadoopHome + "/bin/hadoop");
+        cmd.addAll(Arrays.asList(args));
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Executes Hive query.
+     *
+     * @param qry Query.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHiveQuery(String qry) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        procBuilder.command(cmd);
+
+        cmd.add(hiveHome + "/bin/hive");
+
+        cmd.add("--hiveconf");
+        cmd.add("hive.rpc.query.plan=true");
+
+        cmd.add("--hiveconf");
+        cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
+            "databaseName=metastore_db;create=true");
+
+        cmd.add("-e");
+        cmd.add(qry);
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Tests Hadoop command line integration.
+     */
+    public void testHadoopCommandLine() throws Exception {
+        assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
+
+        assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
+
+        assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
+
+        IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
+
+        assertTrue(igfs.exists(path));
+
+        IgfsPath jobStatPath = null;
+
+        for (IgfsPath jobPath : igfs.listPaths(path)) {
+            assertNull(jobStatPath);
+
+            jobStatPath = jobPath;
+        }
+
+        File locStatFile = new File(testWorkDir, "performance");
+
+        assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
+
+        long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
+
+        assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
+
+        assertTrue(igfs.exists(new IgfsPath("/output")));
+
+        BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
+
+        List<String> res = new ArrayList<>();
+
+        String line;
+
+        while ((line = in.readLine()) != null)
+            res.add(line);
+
+        Collections.sort(res);
+
+        assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
+    }
+
+    /**
+     * Runs query check result.
+     *
+     * @param expRes Expected result.
+     * @param qry Query.
+     * @throws Exception If failed.
+     */
+    private void checkQuery(String expRes, String qry) throws Exception {
+        assertEquals(0, executeHiveQuery("drop table if exists result"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table result " +
+            "row format delimited fields terminated by ' ' " +
+            "stored as textfile " +
+            "location '/result' as " + qry
+        ));
+
+        IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0"));
+
+        byte[] buf = new byte[(int) in.length()];
+
+        in.read(buf);
+
+        assertEquals(expRes, new String(buf));
+    }
+
+    /**
+     * Tests Hive integration.
+     */
+    public void testHiveCommandLine() throws Exception {
+        assertEquals(0, executeHiveQuery(
+            "create table table_a (" +
+                "id_a int," +
+                "id_b int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-a'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table table_b (" +
+                "id_b int," +
+                "rndv int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-b'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
+
+        checkQuery(
+            "0 0\n" +
+            "1 2\n" +
+            "2 4\n" +
+            "3 6\n" +
+            "4 8\n" +
+            "5 10\n" +
+            "6 12\n" +
+            "7 14\n" +
+            "8 16\n" +
+            "9 18\n",
+            "select * from table_a order by id_a limit 10"
+        );
+
+        checkQuery("2000\n", "select count(id_b) from table_b");
+
+        checkQuery(
+            "250 500 2002\n" +
+            "251 502 2006\n" +
+            "252 504 2010\n" +
+            "253 506 2014\n" +
+            "254 508 2018\n" +
+            "255 510 2022\n" +
+            "256 512 2026\n" +
+            "257 514 2030\n" +
+            "258 516 2034\n" +
+            "259 518 2038\n",
+            "select a.id_a, a.id_b, b.rndv" +
+            " from table_a a" +
+            " inner join table_b b on a.id_b = b.id_b" +
+            " where b.rndv > 2000" +
+            " order by a.id_a limit 10"
+        );
+
+        checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 2e2b5cb..e072592 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -39,7 +39,7 @@ import java.util.*;
 /**
  *
  */
-public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSelfTest {
+public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest {
     /** */
     private static final UUID ID_1 = new UUID(0, 1);
 
@@ -104,9 +104,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
-        GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
-        GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
 
         mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
         mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
@@ -164,9 +164,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
-        GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
-        GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
 
         plan(1, split1);
         assert ensureMappers(ID_1, split1);
@@ -220,9 +220,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
-        GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
-        GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
 
         mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
         mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
@@ -266,9 +266,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2);
-        GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2);
-        GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3);
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3);
 
         plan(1, split1);
         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
@@ -308,8 +308,8 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3);
-        GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3);
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3);
 
         mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3));
         mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3));
@@ -344,9 +344,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @throws IgniteCheckedException If failed.
      */
     public void testNonIgfsOrphans() throws IgniteCheckedException {
-        GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2);
-        GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3);
-        GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3);
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3);
 
         plan(1, split1);
         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
@@ -400,11 +400,11 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @return Plan.
      * @throws IgniteCheckedException If failed.
      */
-    private static GridHadoopMapReducePlan plan(int reducers, GridHadoopInputSplit... splits) throws IgniteCheckedException {
+    private static GridHadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException {
         assert reducers > 0;
         assert splits != null && splits.length > 0;
 
-        Collection<GridHadoopInputSplit> splitList = new ArrayList<>(splits.length);
+        Collection<HadoopInputSplit> splitList = new ArrayList<>(splits.length);
 
         Collections.addAll(splitList, splits);
 
@@ -436,12 +436,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @param expSplits Expected splits.
      * @return {@code True} if this assumption is valid.
      */
-    private static boolean ensureMappers(UUID nodeId, GridHadoopInputSplit... expSplits) {
-        Collection<GridHadoopInputSplit> expSplitsCol = new ArrayList<>();
+    private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) {
+        Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>();
 
         Collections.addAll(expSplitsCol, expSplits);
 
-        Collection<GridHadoopInputSplit> splits = PLAN.get().mappers(nodeId);
+        Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId);
 
         return F.eq(expSplitsCol, splits);
     }
@@ -479,10 +479,10 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
      * @param hosts Hosts.
      * @return Split.
      */
-    private static GridHadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) {
+    private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) {
         URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
 
-        return new GridHadoopFileBlock(hosts, uri, start, len);
+        return new HadoopFileBlock(hosts, uri, start, len);
     }
 
     /**
@@ -586,12 +586,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
     /**
      * Mocked job.
      */
-    private static class MockJob implements GridHadoopJob {
+    private static class MockJob implements HadoopJob {
         /** Reducers count. */
         private final int reducers;
 
         /** */
-        private Collection<GridHadoopInputSplit> splitList;
+        private Collection<HadoopInputSplit> splitList;
 
         /**
          * Constructor.
@@ -599,7 +599,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
          * @param reducers Reducers count.
          * @param splitList Splits.
          */
-        private MockJob(int reducers, Collection<GridHadoopInputSplit> splitList) {
+        private MockJob(int reducers, Collection<HadoopInputSplit> splitList) {
             this.reducers = reducers;
             this.splitList = splitList;
         }
@@ -619,12 +619,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException {
+        @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
             return splitList;
         }
 
         /** {@inheritDoc} */
-        @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException {
+        @Override public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException {
             return null;
         }
 
@@ -936,7 +936,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel
         }
 
         /** {@inheritDoc} */
-        @Override public GridHadoop hadoop() {
+        @Override public Hadoop hadoop() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
new file mode 100644
index 0000000..8cf31a2
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test file systems for the working directory multi-threading support.
+ */
+public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+    private static final int THREAD_COUNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+
+    /**
+     * Test the file system with specified URI for the multi-thread working directory support.
+     *
+     * @param uri Base URI of the file system (scheme and authority).
+     * @throws Exception If fails.
+     */
+    private void testFileSystem(final URI uri) throws Exception {
+        final Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
+            new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
+
+        final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT);
+
+        final Path[] newUserInitWorkDir = new Path[THREAD_COUNT];
+        final Path[] newWorkDir = new Path[THREAD_COUNT];
+        final Path[] newAbsWorkDir = new Path[THREAD_COUNT];
+        final Path[] newInstanceWorkDir = new Path[THREAD_COUNT];
+
+        final AtomicInteger threadNum = new AtomicInteger(0);
+
+        GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    int curThreadNum = threadNum.getAndIncrement();
+
+                    FileSystem fs = FileSystem.get(uri, cfg);
+
+                    HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
+
+                    if ("file".equals(uri.getScheme()))
+                        FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
+
+                    changeUserPhase.countDown();
+                    changeUserPhase.await();
+
+                    newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum));
+
+                    changeDirPhase.countDown();
+                    changeDirPhase.await();
+
+                    newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum));
+
+                    changeAbsDirPhase.countDown();
+                    changeAbsDirPhase.await();
+
+                    newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory();
+
+                    finishPhase.countDown();
+                }
+                catch (InterruptedException | IOException e) {
+                    error("Failed to execute test thread.", e);
+
+                    fail();
+                }
+            }
+        }, THREAD_COUNT, "filesystems-test");
+
+        finishPhase.await();
+
+        for (int i = 0; i < THREAD_COUNT; i ++) {
+            cfg.set(MRJobConfig.USER_NAME, "user" + i);
+
+            Path workDir = new Path(new Path(uri), "user/user" + i);
+
+            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString());
+
+            assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory());
+
+            assertEquals(workDir, newUserInitWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]);
+
+            assertEquals(new Path("/folder" + i), newAbsWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]);
+        }
+
+        System.out.println(System.getProperty("user.dir"));
+    }
+
+    /**
+     * Test IGFS multi-thread working directory.
+     *
+     * @throws Exception If fails.
+     */
+    public void testIgfs() throws Exception {
+        testFileSystem(URI.create(igfsScheme()));
+    }
+
+    /**
+     * Test HDFS multi-thread working directory.
+     *
+     * @throws Exception If fails.
+     */
+    public void testHdfs() throws Exception {
+        testFileSystem(URI.create("hdfs://localhost/"));
+    }
+
+    /**
+     * Test LocalFS multi-thread working directory.
+     *
+     * @throws Exception If fails.
+     */
+    public void testLocal() throws Exception {
+        testFileSystem(URI.create("file:///"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
new file mode 100644
index 0000000..a6c29e9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Grouping test.
+ */
+public class HadoopGroupingTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** */
+    private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(HadoopGroupingTest.class)
+        .put("vals", new GridConcurrentHashSet<UUID>());
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingReducer() throws Exception {
+        doTestGrouping(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingCombiner() throws Exception {
+        doTestGrouping(true);
+    }
+
+    /**
+     * @param combiner With combiner.
+     * @throws Exception If failed.
+     */
+    public void doTestGrouping(boolean combiner) throws Exception {
+        vals.clear();
+
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+        job.setOutputFormatClass(OutFormat.class);
+
+        job.setOutputKeyClass(YearTemperature.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setMapperClass(Mapper.class);
+
+        if (combiner) {
+            job.setCombinerClass(MyReducer.class);
+            job.setNumReduceTasks(0);
+            job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
+        }
+        else {
+            job.setReducerClass(MyReducer.class);
+            job.setNumReduceTasks(4);
+            job.setGroupingComparatorClass(YearComparator.class);
+        }
+
+        grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(30000);
+
+        assertTrue(vals.isEmpty());
+    }
+
+    public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> {
+        /** */
+        int lastYear;
+
+        @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context)
+            throws IOException, InterruptedException {
+            X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
+
+            Set<UUID> ids = new HashSet<>();
+
+            for (Text val : vals0)
+                assertTrue(ids.add(UUID.fromString(val.toString())));
+
+            for (Text val : vals0)
+                assertTrue(ids.remove(UUID.fromString(val.toString())));
+
+            assertTrue(ids.isEmpty());
+
+            assertTrue(key.year > lastYear);
+
+            lastYear = key.year;
+
+            for (Text val : vals0)
+                assertTrue(vals.remove(UUID.fromString(val.toString())));
+        }
+    }
+
+    public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator.
+        /** {@inheritDoc} */
+        @Override public int compare(YearTemperature o1, YearTemperature o2) {
+            return Integer.compare(o1.year, o2.year);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable {
+        /** */
+        private int year;
+
+        /** */
+        private int temperature;
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(year);
+            out.writeInt(temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            year = in.readInt();
+            temperature = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            throw new IllegalStateException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() { // To be partitioned by year.
+            return year;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(YearTemperature o) {
+            int res = Integer.compare(year, o.year);
+
+            if (res != 0)
+                return res;
+
+            // Sort comparator by year and temperature, to find max for year.
+            return Integer.compare(o.temperature, temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(YearTemperature.class, this);
+        }
+    }
+
+    public static class InFormat extends InputFormat<YearTemperature, Text> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+            ArrayList<InputSplit> list = new ArrayList<>();
+
+            for (int i = 0; i < 10; i++)
+                list.add(new HadoopSortingTest.FakeSplit(20));
+
+            return list;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split,
+            TaskAttemptContext context) throws IOException, InterruptedException {
+            return new RecordReader<YearTemperature, Text>() {
+                /** */
+                int cnt;
+
+                /** */
+                Random rnd = new GridRandom();
+
+                /** */
+                YearTemperature key = new YearTemperature();
+
+                /** */
+                Text val = new Text();
+
+                @Override public void initialize(InputSplit split, TaskAttemptContext context) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return cnt++ < split.getLength();
+                }
+
+                @Override public YearTemperature getCurrentKey() {
+                    key.year = 1990 + rnd.nextInt(10);
+                    key.temperature = 10 + rnd.nextInt(20);
+
+                    return key;
+                }
+
+                @Override public Text getCurrentValue() {
+                    UUID id = UUID.randomUUID();
+
+                    assertTrue(vals.add(id));
+
+                    val.set(id.toString());
+
+                    return val;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class OutFormat extends OutputFormat {
+        /** {@inheritDoc} */
+        @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
new file mode 100644
index 0000000..ed6d0a0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** Test block count parameter name. */
+    private static final int BLOCK_CNT = 10;
+
+    /** */
+    private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopJobTrackerSelfTest.class);
+
+    /** Map task execution count. */
+    private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger());
+
+    /** Reduce task execution count. */
+    private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger());
+
+    /** Reduce task execution count. */
+    private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger());
+
+    /** */
+    private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>());
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        latch.put("mapAwaitLatch", new CountDownLatch(1));
+        latch.put("reduceAwaitLatch", new CountDownLatch(1));
+        latch.put("combineAwaitLatch", new CountDownLatch(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        mapExecCnt.set(0);
+        combineExecCnt.set(0);
+        reduceExecCnt.set(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMapReducePlanner(new GridHadoopTestRoundRobinMrPlanner());
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleTaskSubmit() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1"));
+
+            GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(0, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskWithCombinerPerMap() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setCombinerClass(TestCombiner.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2"));
+
+            GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            // All maps are completed. We have a combiner, so no reducers should be executed
+            // before combiner latch is released.
+
+            U.sleep(50);
+
+            assertEquals(0, reduceExecCnt.get());
+
+            info("Releasing combiner latch.");
+
+            latch.get("combineAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(10, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * Checks job execution status.
+     *
+     * @param jobId Job ID.
+     * @param complete Completion status.
+     * @throws Exception If failed.
+     */
+    private void checkStatus(GridHadoopJobId jobId, boolean complete) throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal kernal = (IgniteKernal)grid(i);
+
+            Hadoop hadoop = kernal.hadoop();
+
+            GridHadoopJobStatus stat = hadoop.status(jobId);
+
+            assert stat != null;
+
+            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+            if (!complete)
+                assertFalse(fut.isDone());
+            else {
+                info("Waiting for status future completion on node [idx=" + i + ", nodeId=" +
+                    kernal.getLocalNodeId() + ']');
+
+                fut.get();
+            }
+        }
+    }
+
+    /**
+     * Test input format
+     */
+    public static class InFormat extends InputFormat {
+
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>(BLOCK_CNT);
+
+            for (int i = 0; i < BLOCK_CNT; i++)
+                try {
+                    res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"}));
+                }
+                catch (URISyntaxException e) {
+                    throw new IOException(e);
+                }
+
+            return res;
+        }
+
+        @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader() {
+                @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+                }
+
+                @Override public boolean nextKeyValue() {
+                    return false;
+                }
+
+                @Override public Object getCurrentKey() {
+                    return null;
+                }
+
+                @Override public Object getCurrentValue() {
+                    return null;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+
+                }
+            };
+        }
+    }
+
+    /**
+     * Test mapper.
+     */
+    private static class TestMapper extends Mapper {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("mapAwaitLatch").await();
+
+            mapExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    private static class TestReducer extends Reducer {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("reduceAwaitLatch").await();
+
+            reduceExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    private static class TestCombiner extends Reducer {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("combineAwaitLatch").await();
+
+            combineExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
new file mode 100644
index 0000000..f86c608
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests map-reduce execution with embedded mode.
+ */
+public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
+    /** */
+    private static Map<String, Boolean> flags = GridHadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class)
+        .put("flags", new HashMap<String, Boolean>());
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * Tests whole job execution with all phases in old and new versions of API with definition of custom
+     * Serialization, Partitioner and IO formats.
+     * @throws Exception If fails.
+     */
+    public void testMultiReducerWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000,
+            "key6", 18000 );
+
+        for (int i = 0; i < 2; i++) {
+            boolean useNewAPI = i == 1;
+
+            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+            flags.put("serializationWasConfigured", false);
+            flags.put("partitionerWasConfigured", false);
+            flags.put("inputFormatWasConfigured", false);
+            flags.put("outputFormatWasConfigured", false);
+
+            JobConf jobConf = new JobConf();
+
+            jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+            //To split into about 6-7 items for v2
+            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+            //For v1
+            jobConf.setInt("fs.local.block.size", 65000);
+
+            // File system coordinates.
+            setupFileSystems(jobConf);
+
+            GridHadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI);
+
+            if (!useNewAPI) {
+                jobConf.setPartitionerClass(CustomV1Partitioner.class);
+                jobConf.setInputFormat(CustomV1InputFormat.class);
+                jobConf.setOutputFormat(CustomV1OutputFormat.class);
+            }
+
+            Job job = Job.getInstance(jobConf);
+
+            GridHadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI);
+
+            if (useNewAPI) {
+                job.setPartitionerClass(CustomV2Partitioner.class);
+                job.setInputFormatClass(CustomV2InputFormat.class);
+                job.setOutputFormatClass(CustomV2OutputFormat.class);
+            }
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+
+            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+            job.setNumReduceTasks(3);
+
+            job.setJarByClass(GridHadoopWordCount2.class);
+
+            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1),
+                    createJobInfo(job.getConfiguration()));
+
+            fut.get();
+
+            assertTrue("Serialization was configured (new API is " + useNewAPI + ")",
+                 flags.get("serializationWasConfigured"));
+
+            assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")",
+                 flags.get("partitionerWasConfigured"));
+
+            assertTrue("Input format was configured (new API is = " + useNewAPI + ")",
+                 flags.get("inputFormatWasConfigured"));
+
+            assertTrue("Output format was configured (new API is = " + useNewAPI + ")",
+                 flags.get("outputFormatWasConfigured"));
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key3\t15000\n" +
+                "key6\t18000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key1\t10000\n" +
+                "key4\t7000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key2\t20000\n" +
+                "key5\t12000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002")
+            );
+
+        }
+    }
+
+    /**
+     * Custom serialization class that inherits behaviour of native {@link WritableSerialization}.
+     */
+    protected static class CustomSerialization extends WritableSerialization {
+        @Override public void setConf(Configuration conf) {
+            super.setConf(conf);
+
+            flags.put("serializationWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v1 API.
+     */
+    private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("partitionerWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v2 API.
+     */
+    private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
+            implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("partitionerWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v2 API.
+     */
+    private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("inputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v2 API.
+     */
+    private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v1 API.
+     */
+    private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            super.configure(job);
+
+            flags.put("inputFormatWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v1 API.
+     */
+    private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
new file mode 100644
index 0000000..486b856
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * Tests whole job execution with all phases in all combination of new and old versions of API.
+     * @throws Exception If fails.
+     */
+    public void testWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 );
+
+        for (int i = 0; i < 8; i++) {
+            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+            boolean useNewMapper = (i & 1) == 0;
+            boolean useNewCombiner = (i & 2) == 0;
+            boolean useNewReducer = (i & 4) == 0;
+
+            JobConf jobConf = new JobConf();
+
+            jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
+            jobConf.setUser("yyy");
+            jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
+
+            //To split into about 40 items for v2
+            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+            //For v1
+            jobConf.setInt("fs.local.block.size", 65000);
+
+            // File system coordinates.
+            setupFileSystems(jobConf);
+
+            GridHadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
+
+            Job job = Job.getInstance(jobConf);
+
+            GridHadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+
+            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+            job.setJarByClass(GridHadoopWordCount2.class);
+
+            GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1);
+
+            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            fut.get();
+
+            checkJobStatistics(jobId);
+
+            assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
+                useNewReducer,
+                "blue\t200000\n" +
+                "green\t150000\n" +
+                "red\t100000\n" +
+                "yellow\t70000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
+            );
+        }
+    }
+
+    /**
+     * Simple test job statistics.
+     *
+     * @param jobId Job id.
+     * @throws IgniteCheckedException
+     */
+    private void checkJobStatistics(GridHadoopJobId jobId) throws IgniteCheckedException, IOException {
+        GridHadoopCounters cntrs = grid(0).hadoop().counters(jobId);
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
+
+        Map<String, Integer> phaseOrders = new HashMap<>();
+        phaseOrders.put("submit", 0);
+        phaseOrders.put("prepare", 1);
+        phaseOrders.put("start", 2);
+        phaseOrders.put("Cstart", 3);
+        phaseOrders.put("finish", 4);
+
+        String prevTaskId = null;
+
+        long apiEvtCnt = 0;
+
+        for (T2<String, Long> evt : perfCntr.evts()) {
+            //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
+            String[] parsedEvt = evt.get1().split(" ");
+
+            String taskId;
+            String taskPhase;
+
+            if ("JOB".equals(parsedEvt[0])) {
+                taskId = parsedEvt[0];
+                taskPhase = parsedEvt[1];
+            }
+            else {
+                taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1];
+                taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
+            }
+
+            if (!taskId.equals(prevTaskId))
+                tasks.put(taskId, new TreeMap<Integer,Long>());
+
+            Integer pos = phaseOrders.get(taskPhase);
+
+            assertNotNull("Invalid phase " + taskPhase, pos);
+
+            tasks.get(taskId).put(pos, evt.get2());
+
+            prevTaskId = taskId;
+
+            apiEvtCnt++;
+        }
+
+        for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) {
+            Map<Integer, Long> order = task.getValue();
+
+            long prev = 0;
+
+            for (Map.Entry<Integer, Long> phase : order.entrySet()) {
+                assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev);
+
+                prev = phase.getValue();
+            }
+        }
+
+        final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return igfs.exists(statPath);
+            }
+        }, 10000);
+
+        BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
+
+        assertEquals(apiEvtCnt, GridHadoopTestUtils.simpleCheckJobStatFile(reader));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
index 116248f..5d5bb94 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
@@ -34,7 +34,7 @@ public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
      * @throws Exception If fails.
      */
     public void testIntWritableSerialization() throws Exception {
-        GridHadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
+        HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
 
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
 
@@ -56,7 +56,7 @@ public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
      * @throws Exception If fails.
      */
     public void testIntJavaSerialization() throws Exception {
-        GridHadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
+        HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
 
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
new file mode 100644
index 0000000..76357c0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * External test for sorting.
+ */
+public class HadoopSortingExternalTest extends HadoopSortingTest {
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setExternalExecution(true);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
new file mode 100644
index 0000000..5d28a30
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests correct sorting.
+ */
+public class HadoopSortingTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_INPUT = "/test-in";
+
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @return {@code True} if IGFS is enabled on Hadoop nodes.
+     */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSortSimple() throws Exception {
+        // Generate test data.
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        job.setMapperClass(Mapper.class);
+        job.setNumReduceTasks(0);
+
+        setupFileSystems(job.getConfiguration());
+
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
+
+        X.printerrln("Data generation started.");
+
+        grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Data generation complete.");
+
+        // Run main map-reduce job.
+        job = Job.getInstance();
+
+        setupFileSystems(job.getConfiguration());
+
+        job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
+            "," + WritableSerialization.class.getName());
+
+        FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+        job.setSortComparatorClass(JavaSerializationComparator.class);
+
+        job.setMapperClass(MyMapper.class);
+        job.setReducerClass(MyReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setMapOutputKeyClass(UUID.class);
+        job.setMapOutputValueClass(NullWritable.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        X.printerrln("Job started.");
+
+        grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Job complete.");
+
+        // Check result.
+        Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
+
+        AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
+
+        for (FileStatus file : fs.listStatus(outDir)) {
+            X.printerrln("__ file: " + file);
+
+            if (file.getLen() == 0)
+                continue;
+
+            FSDataInputStream in = fs.open(file.getPath());
+
+            Scanner sc = new Scanner(in);
+
+            UUID prev = null;
+
+            while(sc.hasNextLine()) {
+                UUID next = UUID.fromString(sc.nextLine());
+
+//                X.printerrln("___ check: " + next);
+
+                if (prev != null)
+                    assertTrue(prev.compareTo(next) < 0);
+
+                prev = next;
+            }
+        }
+    }
+
+    public static class InFormat extends InputFormat<Text, NullWritable> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>();
+
+            FakeSplit split = new FakeSplit(20);
+
+            for (int i = 0; i < 10; i++)
+                res.add(split);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split,
+            TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader<Text, NullWritable>() {
+                /** */
+                int cnt;
+
+                /** */
+                Text txt = new Text();
+
+                @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return ++cnt <= split.getLength();
+                }
+
+                @Override public Text getCurrentKey() {
+                    txt.set(UUID.randomUUID().toString());
+
+//                    X.printerrln("___ read: " + txt);
+
+                    return txt;
+                }
+
+                @Override public NullWritable getCurrentValue() {
+                    return NullWritable.get();
+                }
+
+                @Override public float getProgress() throws IOException, InterruptedException {
+                    return (float)cnt / split.getLength();
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException {
+//            X.printerrln("___ map: " + val);
+
+            ctx.write(UUID.fromString(val.toString()), NullWritable.get());
+        }
+    }
+
+    public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> {
+        /** */
+        private Text text = new Text();
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx)
+            throws IOException, InterruptedException {
+//            X.printerrln("___ rdc: " + key);
+
+            text.set(key.toString());
+
+            ctx.write(text, NullWritable.get());
+        }
+    }
+
+    public static class FakeSplit extends InputSplit implements Writable {
+        /** */
+        private static final String[] HOSTS = {"127.0.0.1"};
+
+        /** */
+        private int len;
+
+        /**
+         * @param len Length.
+         */
+        public FakeSplit(int len) {
+            this.len = len;
+        }
+
+        /**
+         *
+         */
+        public FakeSplit() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getLength() throws IOException, InterruptedException {
+            return len;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String[] getLocations() throws IOException, InterruptedException {
+            return HOSTS;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            len = in.readInt();
+        }
+    }
+}


Mime
View raw message