ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/50] [abbrv] ignite git commit: IGNITE-4341: Hadoop: added Terasort to unit tests. This closes #1302. This closes #1321.
Date Thu, 22 Dec 2016 15:16:06 GMT
IGNITE-4341: Hadoop: added Terasort to unit tests. This closes #1302. This closes #1321.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b44baf1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b44baf1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b44baf1e

Branch: refs/heads/master
Commit: b44baf1e8c42c57fa4e241d5943593fa4ae42f12
Parents: ffe53eb
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Mon Dec 12 16:52:47 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Thu Dec 15 13:46:29 2016 +0300

----------------------------------------------------------------------
 modules/hadoop/pom.xml                          |   7 +
 .../hadoop/impl/fs/HadoopFileSystemsUtils.java  |  11 +
 .../impl/v2/HadoopV2JobResourceManager.java     |  25 +-
 .../hadoop/impl/HadoopAbstractSelfTest.java     |  13 +-
 .../impl/HadoopAbstractWordCountTest.java       |   6 +-
 .../hadoop/impl/HadoopFileSystemsTest.java      |   9 +
 .../hadoop/impl/HadoopJobTrackerSelfTest.java   |   4 +-
 .../impl/HadoopTaskExecutionSelfTest.java       |   4 +-
 .../hadoop/impl/HadoopTeraSortTest.java         | 376 +++++++++++++++++++
 .../client/HadoopClientProtocolSelfTest.java    |   4 +-
 .../collections/HadoopSkipListSelfTest.java     |  14 +-
 .../HadoopExternalTaskExecutionSelfTest.java    |   2 +
 .../testsuites/IgniteHadoopTestSuite.java       |   3 +
 13 files changed, 450 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/modules/hadoop/pom.xml b/modules/hadoop/pom.xml
index d0b0481..db302d7 100644
--- a/modules/hadoop/pom.xml
+++ b/modules/hadoop/pom.xml
@@ -95,6 +95,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-examples</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.gridgain</groupId>
             <artifactId>ignite-shmem</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
index 5115cb4..37902f0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl.fs;
 
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsConstants;
 import org.jetbrains.annotations.Nullable;
 
@@ -48,4 +50,13 @@ public class HadoopFileSystemsUtils {
     public static String disableFsCachePropertyName(@Nullable String scheme) {
         return String.format("fs.%s.impl.disable.cache", scheme);
     }
+
+    /**
+     * Clears Hadoop {@link FileSystem} cache.
+     *
+     * @throws IOException On error.
+     */
+    public static void clearFileSystemCache() throws IOException {
+        FileSystem.closeAll();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
index 3984f83..52e394b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
@@ -123,7 +123,9 @@ class HadoopV2JobResourceManager {
 
             JobConf cfg = ctx.getJobConf();
 
-            String mrDir = cfg.get("mapreduce.job.dir");
+            Collection<URL> clsPathUrls = new ArrayList<>();
+
+            String mrDir = cfg.get(MRJobConfig.MAPREDUCE_JOB_DIR);
 
             if (mrDir != null) {
                 stagingDir = new Path(new URI(mrDir));
@@ -144,28 +146,23 @@ class HadoopV2JobResourceManager {
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
 
-                Collection<URL> clsPathUrls = new ArrayList<>();
-
                 clsPathUrls.add(jarJobFile.toURI().toURL());
 
                 rsrcSet.add(jarJobFile);
                 rsrcSet.add(new File(jobLocDir, "job.xml"));
-
-                processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
-                processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
-                processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls,
null);
-                processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls,
null);
-
-                if (!clsPathUrls.isEmpty()) {
-                    clsPath = new URL[clsPathUrls.size()];
-
-                    clsPathUrls.toArray(clsPath);
-                }
             }
             else if (!jobLocDir.mkdirs())
                 throw new IgniteCheckedException("Failed to create local job directory: "
                     + jobLocDir.getAbsolutePath());
 
+            processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
+            processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
+            processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls,
null);
+            processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls,
null);
+
+            if (!clsPathUrls.isEmpty())
+                clsPath = clsPathUrls.toArray(new URL[clsPathUrls.size()]);
+
             setLocalFSWorkingDirectory(jobLocDir);
         }
         catch (URISyntaxException | IOException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
index 12351c6..5666cbc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
@@ -83,7 +83,9 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest
{
     private static String initCp;
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
+    @Override protected final void beforeTestsStarted() throws Exception {
+        HadoopFileSystemsUtils.clearFileSystemCache();
+
         // Add surefire classpath to regular classpath.
         initCp = System.getProperty("java.class.path");
 
@@ -93,6 +95,15 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest
{
             System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
 
         super.beforeTestsStarted();
+
+        beforeTestsStarted0();
+    }
+
+    /**
+     * Performs additional initialization in the beginning of test class execution.
+     */
+    protected void beforeTestsStarted0() throws Exception {
+        // noop
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
index 3cb8f91..84e6aee 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
@@ -49,9 +49,7 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
     protected IgfsEx igfs;
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
+    @Override protected void beforeTestsStarted0() throws Exception {
         Configuration cfg = new Configuration();
 
         setupFileSystems(cfg);
@@ -62,6 +60,8 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
         igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
index 252d6cb..7680690 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLocalFileSystemV1;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -37,11 +38,15 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
         startGrids(gridCount());
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
         stopAllGrids(true);
     }
 
@@ -70,6 +75,10 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
         cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
             new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
 
+        FileSystem fs = FileSystem.get(uri, cfg);
+
+        assertTrue(fs instanceof HadoopLocalFileSystemV1);
+
         final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
         final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
         final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index a3bf49c..91ad5ec 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -65,9 +65,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
+    @Override protected void beforeTestsStarted0() throws Exception {
         startGrids(gridCount());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index 027f921..9d45b03 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -83,9 +83,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest
{
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
+    @Override protected void beforeTestsStarted0() throws Exception {
         startGrids(gridCount());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
new file mode 100644
index 0000000..0cc9564
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraInputFormat;
+import org.apache.hadoop.examples.terasort.TeraOutputFormat;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Implements TeraSort Hadoop sample as a unit test.
+ */
+public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
+    /** Copy of Hadoop constant of package-private visibility. */
+    public static final String PARTITION_FILENAME = getPartitionFileNameConstant();
+
+    /**  Out destination dir. */
+    protected final String generateOutDir = getFsBase() + "/tera-generated";
+
+    /** Sort destination dir. */
+    protected final String sortOutDir = getFsBase() + "/tera-sorted";
+
+    /** Validation destination dir. */
+    protected final String validateOutDir = getFsBase() + "/tera-validated";
+
+    /**
+     * Extracts value of Hadoop package-private constant.
+     *
+     * @return TeraInputFormat.PARTITION_FILENAME.
+     */
+    private static String getPartitionFileNameConstant() {
+        try {
+            Field f = TeraInputFormat.class.getDeclaredField("PARTITION_FILENAME");
+
+            f.setAccessible(true);
+
+            return (String)f.get(null);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Gets base directory.
+     * Note that this directory will be completely deleted in the and of the test.
+     * @return The base directory.
+     */
+    protected String getFsBase() {
+        return "file:///tmp/" + getUser() + "/hadoop-terasort-test";
+    }
+
+    /**
+     * @return Full input data size, in bytes.
+     */
+    protected long dataSizeBytes() {
+        return 100_000_000;
+    }
+
+    /**
+     * Desired number of maps in TeraSort job.
+     * @return The number of maps.
+     */
+    protected int numMaps() {
+        return gridCount() * 10;
+    }
+
+    /**
+     * Desired number of reduces in TeraSort job.
+     * @return The number of reduces.
+     */
+    protected int numReduces() {
+        return gridCount() * 8;
+    }
+
+    /**
+     * The user to run Hadoop job on behalf of.
+     * @return The user to run Hadoop job on behalf of.
+     */
+    protected String getUser() {
+        return System.getProperty("user.name");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+
+        // Delete files used:
+        getFileSystem().delete(new Path(getFsBase()), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected final boolean igfsEnabled() {
+        return false;
+    }
+
+    /**
+     * Does actual test TeraSort job Through Ignite API
+     */
+    protected final void teraSort() throws Exception {
+        System.out.println("TeraSort ===============================================================");
+
+        getFileSystem().delete(new Path(sortOutDir), true);
+
+        final JobConf jobConf = new JobConf();
+
+        jobConf.setUser(getUser());
+
+        jobConf.set("fs.defaultFS", getFsBase());
+
+        log().info("Desired number of reduces: " + numReduces());
+
+        jobConf.set("mapreduce.job.reduces", String.valueOf(numReduces()));
+
+        log().info("Desired number of maps: " + numMaps());
+
+        final long splitSize = dataSizeBytes() / numMaps();
+
+        log().info("Desired split size: " + splitSize);
+
+        // Force the split to be of the desired size:
+        jobConf.set("mapred.min.split.size", String.valueOf(splitSize));
+        jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
+
+        Job job = setupConfig(jobConf);
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+        fut.get();
+    }
+
+    /**
+     * Gets the file system we work upon.
+     * @return The file system.
+     * @throws Exception
+     */
+    FileSystem getFileSystem() throws Exception{
+        return FileSystem.get(new URI(getFsBase()), new Configuration());
+    }
+
+    /**
+     * Represents the data generation stage.
+     * @throws Exception
+     */
+    private void teraGenerate() throws Exception {
+        System.out.println("TeraGenerate ===============================================================");
+
+        getFileSystem().delete(new Path(generateOutDir), true);
+
+        final long numLines = dataSizeBytes() / 100; // TeraGen makes 100 bytes ber line
+
+        if (numLines < 1)
+            throw new IllegalStateException("Data size is too small: " + dataSizeBytes());
+
+        // Generate input data:
+        int res = ToolRunner.run(new Configuration(), new TeraGen(), new String[] {"-Dmapreduce.framework.name=local",
+            String.valueOf(numLines), generateOutDir});
+
+        assertEquals(0, res);
+
+        FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(generateOutDir));
+
+        long sumLen = 0;
+
+        for (FileStatus fs: fileStatuses)
+            sumLen += fs.getLen();
+
+        assertEquals(dataSizeBytes(), sumLen); // Ensure correct size data is generated.
+    }
+
+    /**
+     * Creates Job instance and sets up necessary properties for it.
+     * @param conf The Job config.
+     * @return The job.
+     * @throws Exception On error.
+     */
+    private Job setupConfig(JobConf conf) throws Exception {
+        Job job = Job.getInstance(conf);
+
+        Path inputDir = new Path(generateOutDir);
+        Path outputDir = new Path(sortOutDir);
+
+        boolean useSimplePartitioner = TeraSort.getUseSimplePartitioner(job);
+
+        TeraInputFormat.setInputPaths(job, inputDir);
+        FileOutputFormat.setOutputPath(job, outputDir);
+
+        job.setJobName("TeraSort");
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setInputFormatClass(TeraInputFormat.class);
+        job.setOutputFormatClass(TeraOutputFormat.class);
+
+        if (useSimplePartitioner)
+            job.setPartitionerClass(TeraSort.SimplePartitioner.class);
+        else {
+            long start = System.currentTimeMillis();
+
+            Path partFile = new Path(outputDir, PARTITION_FILENAME);
+
+            URI partUri = new URI(partFile.toString() + "#" + PARTITION_FILENAME);
+
+            try {
+                TeraInputFormat.writePartitionFile(job, partFile);
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            }
+
+            job.addCacheFile(partUri);
+
+            long end = System.currentTimeMillis();
+
+            System.out.println("Spent " + (end - start) + "ms computing partitions. " +
+                "Partition file added to distributed cache: " + partUri);
+
+            job.setPartitionerClass(getTeraSortTotalOrderPartitioner()/*TeraSort.TotalOrderPartitioner.class*/);
+        }
+
+        job.getConfiguration().setInt("dfs.replication", TeraSort.getOutputReplication(job));
+
+        /* TeraOutputFormat.setFinalSync(job, true); */
+        Method m = TeraOutputFormat.class.getDeclaredMethod("setFinalSync", JobContext.class,
boolean.class);
+        m.setAccessible(true);
+        m.invoke(null, job, true);
+
+        return job;
+    }
+
+    /**
+     * Extracts package-private TeraSort total order partitioner class.
+     *
+     * @return The class.
+     */
+    @SuppressWarnings("unchecked")
+    private Class<? extends Partitioner> getTeraSortTotalOrderPartitioner() {
+        Class[] classes = TeraSort.class.getDeclaredClasses();
+
+        Class<? extends Partitioner> totalOrderPartitionerCls = null;
+
+        for (Class<?> x: classes) {
+            if ("TotalOrderPartitioner".equals(x.getSimpleName())) {
+                totalOrderPartitionerCls = (Class<? extends Partitioner>)x;
+
+                break;
+            }
+        }
+
+        if (totalOrderPartitionerCls == null)
+            throw new IllegalStateException("Failed to find TeraSort total order partitioner
class.");
+
+        return totalOrderPartitionerCls;
+    }
+
+    /**
+     * Implements validation phase of the sample.
+     * @throws Exception
+     */
+    private void teraValidate() throws Exception {
+        System.out.println("TeraValidate ===============================================================");
+
+        getFileSystem().delete(new Path(validateOutDir), true);
+
+        // Generate input data:
+        int res = ToolRunner.run(new Configuration(), new TeraValidate(),
+            new String[] {"-Dmapreduce.framework.name=local", sortOutDir, validateOutDir});
+
+        assertEquals(0, res);
+
+        FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(validateOutDir),
new PathFilter() {
+            @Override public boolean accept(Path path) {
+                // Typically name is "part-r-00000":
+                return path.getName().startsWith("part-r-");
+            }
+        });
+
+        // TeraValidate has only 1 reduce, so should be only 1 result file:
+        assertEquals(1, fileStatuses.length);
+
+        // The result file must contain only 1 line with the checksum, like this:
+        // "checksum        7a27e2d0d55de",
+        // typically it has length of 23 bytes.
+        // If sorting was not correct, the result contains list of K-V pairs that are not
ordered correctly.
+        // In such case the size of the output will be much larger.
+        long len = fileStatuses[0].getLen();
+
+        assertTrue("TeraValidate length: " + len, len >= 16 && len <= 32);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        getFileSystem().delete(new Path(getFsBase()), true);
+
+        startGrids(gridCount());
+    }
+
+    /**
+     * Runs generate/sort/validate phases of the terasort sample.
+     * @throws Exception
+     */
+    public void testTeraSort() throws Exception {
+        teraGenerate();
+
+        teraSort();
+
+        teraValidate();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration igc = super.getConfiguration(gridName);
+
+        HadoopConfiguration hc = createHadoopConfiguration();
+
+        igc.setHadoopConfiguration(hc);
+
+        return igc;
+    }
+
+    /**
+     * Creates Hadoop configuration for the test.
+     * @return The {@link HadoopConfiguration}.
+     */
+    protected HadoopConfiguration createHadoopConfiguration() {
+        HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+
+        // See org.apache.ignite.configuration.HadoopConfiguration.DFLT_MAX_TASK_QUEUE_SIZE
+        hadoopCfg.setMaxTaskQueueSize(30_000);
+
+        return hadoopCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
index 7156a3d..44fc46e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
@@ -99,9 +99,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest
{
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
+    @Override protected void beforeTestsStarted0() throws Exception {
         startGrids(gridCount());
 
         setupLockFile.delete();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
index 111ea78..1138803 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
@@ -85,6 +85,9 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
         }
     }
 
+    /**
+     * @throws Exception On error.
+     */
     public void testMapSimple() throws Exception {
         GridUnsafeMemory mem = new GridUnsafeMemory(0);
 
@@ -139,7 +142,16 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
         assertEquals(0, mem.allocatedSize());
     }
 
-    private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer,
Integer> vis, HadoopTaskContext taskCtx)
+    /**
+     * Check.
+     * @param m The multimap.
+     * @param mm The multimap storing expectations.
+     * @param vis The multimap to store visitor results.
+     * @param taskCtx The task context.
+     * @throws Exception On error.
+     */
+    private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer,
Integer> vis,
+        HadoopTaskContext taskCtx)
         throws Exception {
         final HadoopTaskInput in = m.input(taskCtx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 7c43500..5f64ce7 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -57,6 +57,8 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
     @Override protected void beforeTest() throws Exception {
         fail("https://issues.apache.org/jira/browse/IGNITE-404");
 
+        super.beforeTest();
+
         startGrids(gridCount());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 959bc59..6046cc1 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.hadoop.HadoopTestClassLoader;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopTeraSortTest;
 import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolEmbeddedSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolMultipleServersSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest;
@@ -123,6 +124,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName())));
 
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopTeraSortTest.class.getName())));
+
         suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
 


Mime
View raw message