Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1A23517E85 for ; Thu, 22 Jan 2015 05:07:26 +0000 (UTC) Received: (qmail 19628 invoked by uid 500); 22 Jan 2015 05:07:26 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 19590 invoked by uid 500); 22 Jan 2015 05:07:26 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 19581 invoked by uid 99); 22 Jan 2015 05:07:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jan 2015 05:07:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9904E03AB; Thu, 22 Jan 2015 05:07:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: <0fd460a1a3234f8f8707fde17e66dd47@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-1826. Add option to disable split grouping and local mode option for tez-examples (zjffdu) Date: Thu, 22 Jan 2015 05:07:25 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 758d5a6b9 -> 7311d7d40 TEZ-1826. Add option to disable split grouping and local mode option for tez-examples (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7311d7d4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7311d7d4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7311d7d4 Branch: refs/heads/master Commit: 7311d7d406039cfe03a2b2696c825a8a400adda9 Parents: 758d5a6 Author: Jeff Zhang Authored: Thu Jan 22 13:06:35 2015 +0800 Committer: Jeff Zhang Committed: Thu Jan 22 13:06:35 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/examples/HashJoinExample.java | 5 +- .../org/apache/tez/examples/JoinValidate.java | 5 +- .../apache/tez/examples/OrderedWordCount.java | 7 +- .../tez/examples/SimpleSessionExample.java | 6 +- .../tez/examples/SortMergeJoinExample.java | 5 +- .../org/apache/tez/examples/TezExampleBase.java | 78 +++++++- .../java/org/apache/tez/examples/WordCount.java | 3 +- .../java/org/apache/tez/test/TestLocalMode.java | 2 +- .../java/org/apache/tez/test/TestTezJobs.java | 177 +++++++++++++++++-- 10 files changed, 253 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 85287c6..b4e529c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1826. Add option to disable split grouping and local mode option for tez-examples. TEZ-1982. TezChild setupUgi should not be using environment. TEZ-1980. Suppress tez-dag findbugs warnings until addressed. TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs. http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java index fa99d4b..aa0e3ab 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java @@ -93,7 +93,6 @@ public class HashJoinExample extends TezExampleBase { System.err.println("Usage: " + "hashjoin [" + broadcastOption + "(default false)]"); - ToolRunner.printGenericCommandUsage(System.err); } @Override @@ -158,7 +157,7 @@ public class HashJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, hashPath.toUri().toString()) - .groupSplits(false).build()); + .groupSplits(!isDisableSplitGrouping()).build()); /** * This vertex represents that side of the data that will be streamed and @@ -174,7 +173,7 @@ public class HashJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, streamPath.toUri().toString()) - .groupSplits(false).build()); + .groupSplits(!isDisableSplitGrouping()).build()); /** * This vertex represents the join operation. It writes the join output as http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index fb9127e..a8d4ec9 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -69,7 +69,6 @@ public class JoinValidate extends TezExampleBase { @Override protected void printUsage() { System.err.println("Usage: " + "joinvalidate "); - ToolRunner.printGenericCommandUsage(System.err); } @Override @@ -147,13 +146,13 @@ public class JoinValidate extends TezExampleBase { ForwardingProcessor.class.getName())).addDataSource("lhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, - lhs.toUri().toString()).groupSplits(false).build()); + lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create( ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, - rhs.toUri().toString()).groupSplits(false).build()); + rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create( JoinValidateProcessor.class.getName()), numPartitions); http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java index 9cf21d3..5cb8dc8 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -122,10 +122,10 @@ public class OrderedWordCount extends TezExampleBase { } public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath, - int numPartitions, String dagName) throws IOException { + int numPartitions, boolean disableSplitGrouping, String dagName) throws IOException { DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf), - TextInputFormat.class, inputPath).build(); + TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build(); DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath).build(); @@ -181,7 +181,6 @@ public class OrderedWordCount extends TezExampleBase { @Override protected void printUsage() { System.err.println("Usage: " + " orderedwordcount in out [numPartitions]"); - ToolRunner.printGenericCommandUsage(System.err); } @@ -197,7 +196,7 @@ public class OrderedWordCount extends TezExampleBase { protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws Exception { DAG dag = createDAG(tezConf, args[0], args[1], - args.length == 3 ? Integer.parseInt(args[2]) : 1, + args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(), "OrderedWordCount"); LOG.info("Running OrderedWordCount"); return runDag(dag, false, LOG); http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java index ee81c6b..d79119c 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java @@ -42,8 +42,8 @@ public class SimpleSessionExample extends TezExampleBase { @Override protected void printUsage() { - System.err.println("Usage: " + " simplesessionexample [numPartitions]"); - ToolRunner.printGenericCommandUsage(System.err); + System.err.println("Usage: " + " simplesessionexample" + + " [numPartitions]"); } @Override @@ -87,7 +87,7 @@ public class SimpleSessionExample extends TezExampleBase { for (int i = 0; i < inputPaths.length; ++i) { DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions, - ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session + isDisableSplitGrouping(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session LOG.info("Running dag number " + i); if(runDag(dag, false, LOG) != 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java index b3336d2..87f569d 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java @@ -89,7 +89,6 @@ public class SortMergeJoinExample extends TezExampleBase { protected void printUsage() { System.err.println("Usage: " + "sortmergejoin "); - ToolRunner.printGenericCommandUsage(System.err); } @Override @@ -159,7 +158,7 @@ public class SortMergeJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1.toUri().toString()) - .groupSplits(false).build()); + .groupSplits(!isDisableSplitGrouping()).build()); /** * The other vertex represents the other side of the join. It reads text @@ -174,7 +173,7 @@ public class SortMergeJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2.toUri().toString()) - .groupSplits(false).build()); + .groupSplits(!isDisableSplitGrouping()).build()); /** * This vertex represents the join operation. It writes the join output as http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 4bad009..e172ce9 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -19,17 +19,23 @@ package org.apache.tez.examples; import javax.annotation.Nullable; + import java.io.IOException; +import java.io.PrintStream; import java.util.Set; import com.google.common.collect.Sets; + +import org.apache.commons.cli.Options; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; @@ -37,16 +43,42 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @InterfaceAudience.Private public abstract class TezExampleBase extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(TezExampleBase.class); + private TezClient tezClientInternal; + protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping"; + protected static final String LOCAL_MODE = "local"; + + private boolean disableSplitGrouping = false; + private boolean isLocalMode = false; + + protected boolean isDisableSplitGrouping() { + return disableSplitGrouping; + } + + private Options getExtraOptions() { + Options options = new Options(); + options.addOption(LOCAL_MODE, false, "run it as local mode"); + options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping"); + return options; + } @Override public final int run(String[] args) throws Exception { Configuration conf = getConf(); - String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args); + String[] otherArgs = optionParser.getRemainingArgs(); + if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) { + isLocalMode = true; + } + if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) { + disableSplitGrouping = true; + } return _execute(otherArgs, null, null); } @@ -58,14 +90,25 @@ public abstract class TezExampleBase extends Configured implements Tool { * @param args arguments to the example * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one * exists. If no TezClient is specified (null), one will be created based on the - * provided configuration + * provided configuration. If TezClient is specified, local mode option can not been + * specified in arguments, it takes no effect. * @return Zero indicates success, non-zero indicates failure * @throws Exception */ public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws Exception { setConf(conf); - String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args); + if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) { + isLocalMode = true; + if (tezClient != null) { + throw new RuntimeException("can't specify local mode when TezClient is created, it takes no effect"); + } + } + if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) { + disableSplitGrouping = true; + } + String[] otherArgs = optionParser.getRemainingArgs(); return _execute(otherArgs, conf, tezClient); } @@ -100,7 +143,7 @@ public abstract class TezExampleBase extends Configured implements Tool { private int _validateArgs(String[] args) { int res = validateArgs(args); if (res != 0) { - printUsage(); + _printUsage(); return res; } return 0; @@ -117,6 +160,13 @@ public abstract class TezExampleBase extends Configured implements Tool { if (tezConf == null) { tezConf = new TezConfiguration(getConf()); } + if (isLocalMode) { + LOG.info("Running in local mode..."); + tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf.set("fs.defaultFS", "file:///"); + tezConf.setBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + } UserGroupInformation.setConfiguration(tezConf); boolean ownTezClient = false; if (tezClient == null) { @@ -140,11 +190,31 @@ public abstract class TezExampleBase extends Configured implements Tool { return tezClient; } + private void _printUsage() { + printUsage(); + System.err.println(); + printExtraOptionsUsage(System.err); + System.err.println(); + ToolRunner.printGenericCommandUsage(System.err); + } + /** * Print usage instructions for this example */ protected abstract void printUsage(); + protected void printExtraOptionsUsage(PrintStream ps) { + ps.println("Tez example extra options supported are"); + // TODO TEZ-1348 make it able to access dfs in tez local mode + ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode," + + " run it in distributed mode without this option"); + ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput," + + " enable split grouping without this option."); + ps.println(); + ps.println("The Tez example extra options usage syntax is "); + ps.println("example_name [extra_options] [example_parameters]"); + } + /** * Validate the arguments * http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java index e263ae0..358a7f7 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java @@ -143,7 +143,7 @@ public class WordCount extends TezExampleBase { // Create the descriptor that describes the input data to Tez. Using MRInput to read text // data from the given input path. The TextInputFormat is used to read the text data. DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf), - TextInputFormat.class, inputPath).build(); + TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()).build(); // Create a descriptor that describes the output data to Tez. Using MROoutput to write text // data to the given output path. The TextOutputFormat is used to write the text data. @@ -197,7 +197,6 @@ public class WordCount extends TezExampleBase { @Override protected void printUsage() { System.err.println("Usage: " + " wordcount in out [numPartitions]"); - ToolRunner.printGenericCommandUsage(System.err); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index a0328bb..3a03739 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -230,7 +230,7 @@ public class TestLocalMode { try { for (int i=0; i expectedResult = new HashSet(); + + FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file")); + FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file")); + BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1)); + BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2)); + for (int i = 0; i < 20; i++) { + String term = "term" + i; + writer1.write(term); + writer1.newLine(); + if (i % 2 == 0) { + writer2.write(term); + writer2.newLine(); + expectedResult.add(term); + } + } + writer1.close(); + writer2.close(); + out1.close(); + out2.close(); + + String[] args = new String[] { + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(), + "-local", "-disableSplitGrouping", + inPath1.toString(), inPath2.toString(), "1", outPath.toString() }; + assertEquals(0, hashJoinExample.run(args)); + + FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + assertEquals(1, statuses.length); + FSDataInputStream inStream = localFs.open(statuses[0].getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); + String line; + while ((line = reader.readLine()) != null) { + assertTrue(expectedResult.remove(line)); + } + reader.close(); + inStream.close(); + assertEquals(0, expectedResult.size()); + } + + @Test(timeout = 60000) public void testSortMergeJoinExample() throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig())); @@ -248,6 +308,64 @@ public class TestTezJobs { assertEquals(0, expectedResult.size()); } + @Test(timeout = 60000) + public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception { + SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); + sortMergeJoinExample.setConf(conf); + Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); + Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1"); + Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2"); + Path outPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/outPath"); + localFs.delete(outPath, true); + localFs.mkdirs(inPath1); + localFs.mkdirs(inPath2); + localFs.mkdirs(stagingDirPath); + + Set expectedResult = new HashSet(); + + FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file")); + FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file")); + BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1)); + BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2)); + for (int i = 0; i < 20; i++) { + String term = "term" + i; + writer1.write(term); + writer1.newLine(); + if (i % 2 == 0) { + writer2.write(term); + writer2.newLine(); + expectedResult.add(term); + } + } + writer1.close(); + writer2.close(); + out1.close(); + out2.close(); + + String[] args = new String[] { + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(), + "-local","-disableSplitGrouping", + inPath1.toString(), inPath2.toString(), "1", outPath.toString() }; + assertEquals(0, sortMergeJoinExample.run(args)); + + FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + assertEquals(1, statuses.length); + FSDataInputStream inStream = localFs.open(statuses[0].getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); + String line; + while ((line = reader.readLine()) != null) { + assertTrue(expectedResult.remove(line)); + } + reader.close(); + inStream.close(); + assertEquals(0, expectedResult.size()); + } + /** * test whole {@link HashJoinExample} pipeline as following:
* {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate} @@ -344,15 +462,15 @@ public class TestTezJobs { } } - private void generateOrderedWordCountInput(Path inputDir) throws IOException { + private void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException { Path dataPath1 = new Path(inputDir, "inPath1"); Path dataPath2 = new Path(inputDir, "inPath2"); FSDataOutputStream f1 = null; FSDataOutputStream f2 = null; try { - f1 = remoteFs.create(dataPath1); - f2 = remoteFs.create(dataPath2); + f1 = fs.create(dataPath1); + f2 = fs.create(dataPath2); final String prefix = "a"; for (int i = 1; i <= 10; ++i) { @@ -377,8 +495,8 @@ public class TestTezJobs { } } - private void verifyOrderedWordCountOutput(Path resultFile) throws IOException { - FSDataInputStream inputStream = remoteFs.open(resultFile); + private void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException { + FSDataInputStream inputStream = fs.open(resultFile); final String prefix = "a"; int currentCounter = 10; @@ -402,8 +520,8 @@ public class TestTezJobs { Assert.assertEquals(0, currentCounter); } - private void verifyOutput(Path outputDir) throws IOException { - FileStatus[] fileStatuses = remoteFs.listStatus(outputDir); + private void verifyOutput(Path outputDir, FileSystem fs) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(outputDir); Path resultFile = null; boolean foundResult = false; boolean foundSuccessFile = false; @@ -428,7 +546,7 @@ public class TestTezJobs { assertTrue(foundResult); assertTrue(resultFile != null); assertTrue(foundSuccessFile); - verifyOrderedWordCountOutput(resultFile); + verifyOrderedWordCountOutput(resultFile, fs); } @Test(timeout = 60000) @@ -438,7 +556,7 @@ public class TestTezJobs { Path stagingDirPath = new Path("/tmp/owc-staging-dir"); remoteFs.mkdirs(inputDir); remoteFs.mkdirs(stagingDirPath); - generateOrderedWordCountInput(inputDir); + generateOrderedWordCountInput(inputDir, remoteFs); String outputDirStr = "/tmp/owc-output/"; Path outputDir = new Path(outputDirStr); @@ -451,7 +569,7 @@ public class TestTezJobs { OrderedWordCount job = new OrderedWordCount(); Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{inputDirStr, outputDirStr, "2"}, null)==0); - verifyOutput(outputDir); + verifyOutput(outputDir, remoteFs); } finally { remoteFs.delete(stagingDirPath, true); @@ -463,6 +581,39 @@ public class TestTezJobs { } @Test(timeout = 60000) + public void testOrderedWordCountDisableSplitGrouping() throws Exception { + String inputDirStr = TEST_ROOT_DIR + "/tmp/owc-input/"; + Path inputDir = new Path(inputDirStr); + Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/owc-staging-dir"); + localFs.mkdirs(inputDir); + localFs.mkdirs(stagingDirPath); + generateOrderedWordCountInput(inputDir, localFs); + + String outputDirStr = TEST_ROOT_DIR + "/tmp/owc-output/"; + localFs.delete(new Path(outputDirStr), true); + Path outputDir = new Path(outputDirStr); + + TezConfiguration tezConf = new TezConfiguration(conf); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + TezClient tezSession = null; + + try { + + OrderedWordCount job = new OrderedWordCount(); + Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{"-local", "-disableSplitGrouping", + inputDirStr, outputDirStr, "2"}, null)==0); + verifyOutput(outputDir, localFs); + + } finally { + localFs.delete(stagingDirPath, true); + if (tezSession != null) { + tezSession.stop(); + } + } + + } + + @Test(timeout = 60000) public void testSimpleSessionExample() throws Exception { Path stagingDirPath = new Path("/tmp/owc-staging-dir"); remoteFs.mkdirs(stagingDirPath); @@ -476,7 +627,7 @@ public class TestTezJobs { inputPaths[i] = inputDirStr; Path inputDir = new Path(inputDirStr); remoteFs.mkdirs(inputDir); - generateOrderedWordCountInput(inputDir); + generateOrderedWordCountInput(inputDir, remoteFs); String outputDirStr = "/tmp/owc-output-" + i + "/"; outputPaths[i] = outputDirStr; Path outputDir = new Path(outputDirStr); @@ -504,7 +655,7 @@ public class TestTezJobs { StringUtils.join(",", outputPaths), "2" }, null) == 0); for (int i=0; i