Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D451200BDB for ; Mon, 12 Dec 2016 14:53:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3BCD9160B22; Mon, 12 Dec 2016 13:53:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DDC1E160B0D for ; Mon, 12 Dec 2016 14:53:17 +0100 (CET) Received: (qmail 29900 invoked by uid 500); 12 Dec 2016 13:53:17 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 29891 invoked by uid 99); 12 Dec 2016 13:53:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Dec 2016 13:53:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01B96E9411; Mon, 12 Dec 2016 13:53:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Message-Id: <44c388684ab04213958e1831f1cade8b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-4341: Hadoop: added Terasort to unit tests. This closes #1302. This closes #1321. Date: Mon, 12 Dec 2016 13:53:17 +0000 (UTC) archived-at: Mon, 12 Dec 2016 13:53:19 -0000 Repository: ignite Updated Branches: refs/heads/master 9762643f8 -> f621f7f76 IGNITE-4341: Hadoop: added Terasort to unit tests. This closes #1302. This closes #1321. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f621f7f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f621f7f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f621f7f7 Branch: refs/heads/master Commit: f621f7f76fc8c97c6f5c98d89cff6c9a1b70c24b Parents: 9762643 Author: iveselovskiy Authored: Mon Dec 12 16:52:47 2016 +0300 Committer: devozerov Committed: Mon Dec 12 16:52:47 2016 +0300 ---------------------------------------------------------------------- modules/hadoop/pom.xml | 7 + .../hadoop/impl/fs/HadoopFileSystemsUtils.java | 11 + .../impl/v2/HadoopV2JobResourceManager.java | 25 +- .../hadoop/impl/HadoopAbstractSelfTest.java | 13 +- .../impl/HadoopAbstractWordCountTest.java | 6 +- .../hadoop/impl/HadoopFileSystemsTest.java | 9 + .../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +- .../impl/HadoopTaskExecutionSelfTest.java | 4 +- .../hadoop/impl/HadoopTeraSortTest.java | 376 +++++++++++++++++++ .../client/HadoopClientProtocolSelfTest.java | 4 +- .../collections/HadoopSkipListSelfTest.java | 14 +- .../HadoopExternalTaskExecutionSelfTest.java | 2 + .../testsuites/IgniteHadoopTestSuite.java | 3 + 13 files changed, 450 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/modules/hadoop/pom.xml b/modules/hadoop/pom.xml index b97030b..e5373ab 100644 --- a/modules/hadoop/pom.xml +++ b/modules/hadoop/pom.xml @@ -95,6 +95,13 @@ + org.apache.hadoop + hadoop-mapreduce-examples + ${hadoop.version} + test + + + org.gridgain ignite-shmem test http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java index 5115cb4..37902f0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsConstants; import org.jetbrains.annotations.Nullable; @@ -48,4 +50,13 @@ public class HadoopFileSystemsUtils { public static String disableFsCachePropertyName(@Nullable String scheme) { return String.format("fs.%s.impl.disable.cache", scheme); } + + /** + * Clears Hadoop {@link FileSystem} cache. + * + * @throws IOException On error. + */ + public static void clearFileSystemCache() throws IOException { + FileSystem.closeAll(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java index 3984f83..52e394b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java @@ -123,7 +123,9 @@ class HadoopV2JobResourceManager { JobConf cfg = ctx.getJobConf(); - String mrDir = cfg.get("mapreduce.job.dir"); + Collection clsPathUrls = new ArrayList<>(); + + String mrDir = cfg.get(MRJobConfig.MAPREDUCE_JOB_DIR); if (mrDir != null) { stagingDir = new Path(new URI(mrDir)); @@ -144,28 +146,23 @@ class HadoopV2JobResourceManager { File jarJobFile = new File(jobLocDir, "job.jar"); - Collection clsPathUrls = new ArrayList<>(); - clsPathUrls.add(jarJobFile.toURI().toURL()); rsrcSet.add(jarJobFile); rsrcSet.add(new File(jobLocDir, "job.xml")); - - processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES); - processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES); - processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null); - processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null); - - if (!clsPathUrls.isEmpty()) { - clsPath = new URL[clsPathUrls.size()]; - - clsPathUrls.toArray(clsPath); - } } else if (!jobLocDir.mkdirs()) throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES); + processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES); + processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null); + processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null); + + if (!clsPathUrls.isEmpty()) + clsPath = clsPathUrls.toArray(new URL[clsPathUrls.size()]); + setLocalFSWorkingDirectory(jobLocDir); } catch (URISyntaxException | IOException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java index 12351c6..5666cbc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java @@ -83,7 +83,9 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { private static String initCp; /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { + @Override protected final void beforeTestsStarted() throws Exception { + HadoopFileSystemsUtils.clearFileSystemCache(); + // Add surefire classpath to regular classpath. initCp = System.getProperty("java.class.path"); @@ -93,6 +95,15 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); super.beforeTestsStarted(); + + beforeTestsStarted0(); + } + + /** + * Performs additional initialization in the beginning of test class execution. + */ + protected void beforeTestsStarted0() throws Exception { + // noop } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java index 3cb8f91..84e6aee 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java @@ -49,9 +49,7 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest protected IgfsEx igfs; /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - + @Override protected void beforeTestsStarted0() throws Exception { Configuration cfg = new Configuration(); setupFileSystems(cfg); @@ -62,6 +60,8 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + super.beforeTest(); + igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java index 252d6cb..7680690 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLocalFileSystemV1; import org.apache.ignite.testframework.GridTestUtils; /** @@ -37,11 +38,15 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + super.beforeTest(); + startGrids(gridCount()); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + super.afterTest(); + stopAllGrids(true); } @@ -70,6 +75,10 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); + FileSystem fs = FileSystem.get(uri, cfg); + + assertTrue(fs instanceof HadoopLocalFileSystemV1); + final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java index a3bf49c..91ad5ec 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java @@ -65,9 +65,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - + @Override protected void beforeTestsStarted0() throws Exception { startGrids(gridCount()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java index 027f921..9d45b03 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java @@ -83,9 +83,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - + @Override protected void beforeTestsStarted0() throws Exception { startGrids(gridCount()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java new file mode 100644 index 0000000..0cc9564 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.terasort.TeraGen; +import org.apache.hadoop.examples.terasort.TeraInputFormat; +import org.apache.hadoop.examples.terasort.TeraOutputFormat; +import org.apache.hadoop.examples.terasort.TeraSort; +import org.apache.hadoop.examples.terasort.TeraValidate; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Implements TeraSort Hadoop sample as a unit test. + */ +public class HadoopTeraSortTest extends HadoopAbstractSelfTest { + /** Copy of Hadoop constant of package-private visibility. */ + public static final String PARTITION_FILENAME = getPartitionFileNameConstant(); + + /** Out destination dir. */ + protected final String generateOutDir = getFsBase() + "/tera-generated"; + + /** Sort destination dir. */ + protected final String sortOutDir = getFsBase() + "/tera-sorted"; + + /** Validation destination dir. */ + protected final String validateOutDir = getFsBase() + "/tera-validated"; + + /** + * Extracts value of Hadoop package-private constant. + * + * @return TeraInputFormat.PARTITION_FILENAME. + */ + private static String getPartitionFileNameConstant() { + try { + Field f = TeraInputFormat.class.getDeclaredField("PARTITION_FILENAME"); + + f.setAccessible(true); + + return (String)f.get(null); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** + * Gets base directory. + * Note that this directory will be completely deleted in the and of the test. + * @return The base directory. + */ + protected String getFsBase() { + return "file:///tmp/" + getUser() + "/hadoop-terasort-test"; + } + + /** + * @return Full input data size, in bytes. + */ + protected long dataSizeBytes() { + return 100_000_000; + } + + /** + * Desired number of maps in TeraSort job. + * @return The number of maps. + */ + protected int numMaps() { + return gridCount() * 10; + } + + /** + * Desired number of reduces in TeraSort job. + * @return The number of reduces. + */ + protected int numReduces() { + return gridCount() * 8; + } + + /** + * The user to run Hadoop job on behalf of. + * @return The user to run Hadoop job on behalf of. + */ + protected String getUser() { + return System.getProperty("user.name"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + + // Delete files used: + getFileSystem().delete(new Path(getFsBase()), true); + } + + /** {@inheritDoc} */ + @Override protected final boolean igfsEnabled() { + return false; + } + + /** + * Does actual test TeraSort job Through Ignite API + */ + protected final void teraSort() throws Exception { + System.out.println("TeraSort ==============================================================="); + + getFileSystem().delete(new Path(sortOutDir), true); + + final JobConf jobConf = new JobConf(); + + jobConf.setUser(getUser()); + + jobConf.set("fs.defaultFS", getFsBase()); + + log().info("Desired number of reduces: " + numReduces()); + + jobConf.set("mapreduce.job.reduces", String.valueOf(numReduces())); + + log().info("Desired number of maps: " + numMaps()); + + final long splitSize = dataSizeBytes() / numMaps(); + + log().info("Desired split size: " + splitSize); + + // Force the split to be of the desired size: + jobConf.set("mapred.min.split.size", String.valueOf(splitSize)); + jobConf.set("mapred.max.split.size", String.valueOf(splitSize)); + + Job job = setupConfig(jobConf); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + } + + /** + * Gets the file system we work upon. + * @return The file system. + * @throws Exception + */ + FileSystem getFileSystem() throws Exception{ + return FileSystem.get(new URI(getFsBase()), new Configuration()); + } + + /** + * Represents the data generation stage. + * @throws Exception + */ + private void teraGenerate() throws Exception { + System.out.println("TeraGenerate ==============================================================="); + + getFileSystem().delete(new Path(generateOutDir), true); + + final long numLines = dataSizeBytes() / 100; // TeraGen makes 100 bytes ber line + + if (numLines < 1) + throw new IllegalStateException("Data size is too small: " + dataSizeBytes()); + + // Generate input data: + int res = ToolRunner.run(new Configuration(), new TeraGen(), new String[] {"-Dmapreduce.framework.name=local", + String.valueOf(numLines), generateOutDir}); + + assertEquals(0, res); + + FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(generateOutDir)); + + long sumLen = 0; + + for (FileStatus fs: fileStatuses) + sumLen += fs.getLen(); + + assertEquals(dataSizeBytes(), sumLen); // Ensure correct size data is generated. + } + + /** + * Creates Job instance and sets up necessary properties for it. + * @param conf The Job config. + * @return The job. + * @throws Exception On error. + */ + private Job setupConfig(JobConf conf) throws Exception { + Job job = Job.getInstance(conf); + + Path inputDir = new Path(generateOutDir); + Path outputDir = new Path(sortOutDir); + + boolean useSimplePartitioner = TeraSort.getUseSimplePartitioner(job); + + TeraInputFormat.setInputPaths(job, inputDir); + FileOutputFormat.setOutputPath(job, outputDir); + + job.setJobName("TeraSort"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setInputFormatClass(TeraInputFormat.class); + job.setOutputFormatClass(TeraOutputFormat.class); + + if (useSimplePartitioner) + job.setPartitionerClass(TeraSort.SimplePartitioner.class); + else { + long start = System.currentTimeMillis(); + + Path partFile = new Path(outputDir, PARTITION_FILENAME); + + URI partUri = new URI(partFile.toString() + "#" + PARTITION_FILENAME); + + try { + TeraInputFormat.writePartitionFile(job, partFile); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + job.addCacheFile(partUri); + + long end = System.currentTimeMillis(); + + System.out.println("Spent " + (end - start) + "ms computing partitions. " + + "Partition file added to distributed cache: " + partUri); + + job.setPartitionerClass(getTeraSortTotalOrderPartitioner()/*TeraSort.TotalOrderPartitioner.class*/); + } + + job.getConfiguration().setInt("dfs.replication", TeraSort.getOutputReplication(job)); + + /* TeraOutputFormat.setFinalSync(job, true); */ + Method m = TeraOutputFormat.class.getDeclaredMethod("setFinalSync", JobContext.class, boolean.class); + m.setAccessible(true); + m.invoke(null, job, true); + + return job; + } + + /** + * Extracts package-private TeraSort total order partitioner class. + * + * @return The class. + */ + @SuppressWarnings("unchecked") + private Class getTeraSortTotalOrderPartitioner() { + Class[] classes = TeraSort.class.getDeclaredClasses(); + + Class totalOrderPartitionerCls = null; + + for (Class x: classes) { + if ("TotalOrderPartitioner".equals(x.getSimpleName())) { + totalOrderPartitionerCls = (Class)x; + + break; + } + } + + if (totalOrderPartitionerCls == null) + throw new IllegalStateException("Failed to find TeraSort total order partitioner class."); + + return totalOrderPartitionerCls; + } + + /** + * Implements validation phase of the sample. + * @throws Exception + */ + private void teraValidate() throws Exception { + System.out.println("TeraValidate ==============================================================="); + + getFileSystem().delete(new Path(validateOutDir), true); + + // Generate input data: + int res = ToolRunner.run(new Configuration(), new TeraValidate(), + new String[] {"-Dmapreduce.framework.name=local", sortOutDir, validateOutDir}); + + assertEquals(0, res); + + FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(validateOutDir), new PathFilter() { + @Override public boolean accept(Path path) { + // Typically name is "part-r-00000": + return path.getName().startsWith("part-r-"); + } + }); + + // TeraValidate has only 1 reduce, so should be only 1 result file: + assertEquals(1, fileStatuses.length); + + // The result file must contain only 1 line with the checksum, like this: + // "checksum 7a27e2d0d55de", + // typically it has length of 23 bytes. + // If sorting was not correct, the result contains list of K-V pairs that are not ordered correctly. + // In such case the size of the output will be much larger. + long len = fileStatuses[0].getLen(); + + assertTrue("TeraValidate length: " + len, len >= 16 && len <= 32); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + getFileSystem().delete(new Path(getFsBase()), true); + + startGrids(gridCount()); + } + + /** + * Runs generate/sort/validate phases of the terasort sample. + * @throws Exception + */ + public void testTeraSort() throws Exception { + teraGenerate(); + + teraSort(); + + teraValidate(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration igc = super.getConfiguration(gridName); + + HadoopConfiguration hc = createHadoopConfiguration(); + + igc.setHadoopConfiguration(hc); + + return igc; + } + + /** + * Creates Hadoop configuration for the test. + * @return The {@link HadoopConfiguration}. + */ + protected HadoopConfiguration createHadoopConfiguration() { + HadoopConfiguration hadoopCfg = new HadoopConfiguration(); + + // See org.apache.ignite.configuration.HadoopConfiguration.DFLT_MAX_TASK_QUEUE_SIZE + hadoopCfg.setMaxTaskQueueSize(30_000); + + return hadoopCfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java index 7156a3d..44fc46e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java @@ -99,9 +99,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - + @Override protected void beforeTestsStarted0() throws Exception { startGrids(gridCount()); setupLockFile.delete(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java index 111ea78..1138803 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java @@ -85,6 +85,9 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { } } + /** + * @throws Exception On error. + */ public void testMapSimple() throws Exception { GridUnsafeMemory mem = new GridUnsafeMemory(0); @@ -139,7 +142,16 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { assertEquals(0, mem.allocatedSize()); } - private void check(HadoopMultimap m, Multimap mm, final Multimap vis, HadoopTaskContext taskCtx) + /** + * Check. + * @param m The multimap. + * @param mm The multimap storing expectations. + * @param vis The multimap to store visitor results. + * @param taskCtx The task context. + * @throws Exception On error. + */ + private void check(HadoopMultimap m, Multimap mm, final Multimap vis, + HadoopTaskContext taskCtx) throws Exception { final HadoopTaskInput in = m.input(taskCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index 7c43500..5f64ce7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -57,6 +57,8 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest @Override protected void beforeTest() throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-404"); + super.beforeTest(); + startGrids(gridCount()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f621f7f7/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 959bc59..6046cc1 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -23,6 +23,7 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.hadoop.HadoopTestClassLoader; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTeraSortTest; import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolEmbeddedSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolMultipleServersSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest; @@ -123,6 +124,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTeraSortTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));