tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-1826. Add option to disable split grouping and local mode option for tez-examples (zjffdu)
Date Thu, 22 Jan 2015 05:07:25 GMT
Repository: tez
Updated Branches:
  refs/heads/master 758d5a6b9 -> 7311d7d40


TEZ-1826. Add option to disable split grouping and local mode option for tez-examples (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7311d7d4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7311d7d4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7311d7d4

Branch: refs/heads/master
Commit: 7311d7d406039cfe03a2b2696c825a8a400adda9
Parents: 758d5a6
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Jan 22 13:06:35 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Jan 22 13:06:35 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/examples/HashJoinExample.java    |   5 +-
 .../org/apache/tez/examples/JoinValidate.java   |   5 +-
 .../apache/tez/examples/OrderedWordCount.java   |   7 +-
 .../tez/examples/SimpleSessionExample.java      |   6 +-
 .../tez/examples/SortMergeJoinExample.java      |   5 +-
 .../org/apache/tez/examples/TezExampleBase.java |  78 +++++++-
 .../java/org/apache/tez/examples/WordCount.java |   3 +-
 .../java/org/apache/tez/test/TestLocalMode.java |   2 +-
 .../java/org/apache/tez/test/TestTezJobs.java   | 177 +++++++++++++++++--
 10 files changed, 253 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 85287c6..b4e529c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1826. Add option to disable split grouping and local mode option for tez-examples.
   TEZ-1982. TezChild setupUgi should not be using environment.
   TEZ-1980. Suppress tez-dag findbugs warnings until addressed.
   TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs.

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
index fa99d4b..aa0e3ab 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -93,7 +93,6 @@ public class HashJoinExample extends TezExampleBase {
     System.err.println("Usage: "
         + "hashjoin <file1> <file2> <numPartitions> <outPath> ["
         + broadcastOption + "(default false)]");
-    ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override
@@ -158,7 +157,7 @@ public class HashJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, hashPath.toUri().toString())
-                    .groupSplits(false).build());
+                    .groupSplits(!isDisableSplitGrouping()).build());
 
     /**
      * This vertex represents that side of the data that will be streamed and
@@ -174,7 +173,7 @@ public class HashJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, streamPath.toUri().toString())
-                    .groupSplits(false).build());
+                    .groupSplits(!isDisableSplitGrouping()).build());
 
     /**
      * This vertex represents the join operation. It writes the join output as

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index fb9127e..a8d4ec9 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -69,7 +69,6 @@ public class JoinValidate extends TezExampleBase {
   @Override
   protected void printUsage() {
     System.err.println("Usage: " + "joinvalidate <path1> <path2>");
-    ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override
@@ -147,13 +146,13 @@ public class JoinValidate extends TezExampleBase {
         ForwardingProcessor.class.getName())).addDataSource("lhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                lhs.toUri().toString()).groupSplits(false).build());
+                lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
 
     Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                rhs.toUri().toString()).groupSplits(false).build());
+                rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
 
     Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
         JoinValidateProcessor.class.getName()), numPartitions);

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index 9cf21d3..5cb8dc8 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -122,10 +122,10 @@ public class OrderedWordCount extends TezExampleBase {
   }
   
   public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
-      int numPartitions, String dagName) throws IOException {
+      int numPartitions, boolean disableSplitGrouping, String dagName) throws IOException
{
 
     DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
-        TextInputFormat.class, inputPath).build();
+        TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build();
 
     DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
         TextOutputFormat.class, outputPath).build();
@@ -181,7 +181,6 @@ public class OrderedWordCount extends TezExampleBase {
   @Override
   protected void printUsage() {
     System.err.println("Usage: " + " orderedwordcount in out [numPartitions]");
-    ToolRunner.printGenericCommandUsage(System.err);
   }
 
 
@@ -197,7 +196,7 @@ public class OrderedWordCount extends TezExampleBase {
   protected int runJob(String[] args, TezConfiguration tezConf,
       TezClient tezClient) throws Exception {
     DAG dag = createDAG(tezConf, args[0], args[1],
-        args.length == 3 ? Integer.parseInt(args[2]) : 1,
+        args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(),
         "OrderedWordCount");
     LOG.info("Running OrderedWordCount");
     return runDag(dag, false, LOG);

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index ee81c6b..d79119c 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -42,8 +42,8 @@ public class SimpleSessionExample extends TezExampleBase {
 
   @Override
   protected void printUsage() {
-    System.err.println("Usage: " + " simplesessionexample <in1,in2> <out1, out2>
[numPartitions]");
-    ToolRunner.printGenericCommandUsage(System.err);
+    System.err.println("Usage: " + " simplesessionexample"
+        + " <in1,in2> <out1, out2> [numPartitions]");
   }
 
   @Override
@@ -87,7 +87,7 @@ public class SimpleSessionExample extends TezExampleBase {
 
     for (int i = 0; i < inputPaths.length; ++i) {
       DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions,
-          ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+          isDisableSplitGrouping(), ("DAG-Iteration-" + i)); // the names of the DAGs must
be unique in a session
 
       LOG.info("Running dag number " + i);
       if(runDag(dag, false, LOG) != 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
index b3336d2..87f569d 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -89,7 +89,6 @@ public class SortMergeJoinExample extends TezExampleBase {
   protected void printUsage() {
     System.err.println("Usage: "
         + "sortmergejoin <file1> <file2> <numPartitions> <outPath>");
-    ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override
@@ -159,7 +158,7 @@ public class SortMergeJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, inputPath1.toUri().toString())
-                    .groupSplits(false).build());
+                    .groupSplits(!isDisableSplitGrouping()).build());
 
     /**
      * The other vertex represents the other side of the join. It reads text
@@ -174,7 +173,7 @@ public class SortMergeJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, inputPath2.toUri().toString())
-                    .groupSplits(false).build());
+                    .groupSplits(!isDisableSplitGrouping()).build());
 
     /**
      * This vertex represents the join operation. It writes the join output as

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 4bad009..e172ce9 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -19,17 +19,23 @@
 package org.apache.tez.examples;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
+import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -37,16 +43,42 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 @InterfaceAudience.Private
 public abstract class TezExampleBase extends Configured implements Tool {
 
+  private static final Log LOG = LogFactory.getLog(TezExampleBase.class);
+
   private TezClient tezClientInternal;
+  protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping";
+  protected static final String LOCAL_MODE = "local";
+
+  private boolean disableSplitGrouping = false;
+  private boolean isLocalMode = false;
+
+  protected boolean isDisableSplitGrouping() {
+    return disableSplitGrouping;
+  }
+
+  private Options getExtraOptions() {
+    Options options = new Options();
+    options.addOption(LOCAL_MODE, false, "run it as local mode");
+    options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping");
+    return options;
+  }
 
   @Override
   public final int run(String[] args) throws Exception {
     Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(),
args);
+    String[] otherArgs = optionParser.getRemainingArgs();
+    if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
+      isLocalMode = true;
+    }
+    if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) {
+      disableSplitGrouping = true;
+    }
     return _execute(otherArgs, null, null);
   }
 
@@ -58,14 +90,25 @@ public abstract class TezExampleBase extends Configured implements Tool
{
    * @param args      arguments to the example
    * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance
if one
    *                  exists. If no TezClient is specified (null), one will be created based
on the
-   *                  provided configuration
+   *                  provided configuration. If TezClient is specified, local mode option
can not been
+   *                  specified in arguments, it takes no effect.
    * @return Zero indicates success, non-zero indicates failure
    * @throws Exception 
    */
   public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
       Exception {
     setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(),
args);
+    if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
+      isLocalMode = true;
+      if (tezClient != null) {
+        throw new RuntimeException("can't specify local mode when TezClient is created, it
takes no effect");
+      }
+    }
+    if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) {
+      disableSplitGrouping = true;
+    }
+    String[] otherArgs = optionParser.getRemainingArgs();
     return _execute(otherArgs, conf, tezClient);
   }
 
@@ -100,7 +143,7 @@ public abstract class TezExampleBase extends Configured implements Tool
{
   private int _validateArgs(String[] args) {
     int res = validateArgs(args);
     if (res != 0) {
-      printUsage();
+      _printUsage();
       return res;
     }
     return 0;
@@ -117,6 +160,13 @@ public abstract class TezExampleBase extends Configured implements Tool
{
     if (tezConf == null) {
       tezConf = new TezConfiguration(getConf());
     }
+    if (isLocalMode) {
+      LOG.info("Running in local mode...");
+      tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+      tezConf.set("fs.defaultFS", "file:///");
+      tezConf.setBoolean(
+          TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    }
     UserGroupInformation.setConfiguration(tezConf);
     boolean ownTezClient = false;
     if (tezClient == null) {
@@ -140,11 +190,31 @@ public abstract class TezExampleBase extends Configured implements Tool
{
     return tezClient;
   }
 
+  private void _printUsage() {
+    printUsage();
+    System.err.println();
+    printExtraOptionsUsage(System.err);
+    System.err.println();
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
   /**
    * Print usage instructions for this example
    */
   protected abstract void printUsage();
 
+  protected void printExtraOptionsUsage(PrintStream ps) {
+    ps.println("Tez example extra options supported are");
+    // TODO TEZ-1348 make it able to access dfs in tez local mode
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access
local file system in tez local mode,"
+        + " run it in distributed mode without this option");
+    ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
+        + " enable split grouping without this option.");
+    ps.println();
+    ps.println("The Tez example extra options usage syntax is ");
+    ps.println("example_name [extra_options] [example_parameters]");
+  }
+
   /**
    * Validate the arguments
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index e263ae0..358a7f7 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -143,7 +143,7 @@ public class WordCount extends TezExampleBase {
     // Create the descriptor that describes the input data to Tez. Using MRInput to read
text 
     // data from the given input path. The TextInputFormat is used to read the text data.
     DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
-        TextInputFormat.class, inputPath).build();
+        TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()).build();
 
     // Create a descriptor that describes the output data to Tez. Using MROoutput to write
text
     // data to the given output path. The TextOutputFormat is used to write the text data.
@@ -197,7 +197,6 @@ public class WordCount extends TezExampleBase {
   @Override
   protected void printUsage() {
     System.err.println("Usage: " + " wordcount in out [numPartitions]");
-    ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index a0328bb..3a03739 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -230,7 +230,7 @@ public class TestLocalMode {
     try {
       for (int i=0; i<inputPaths.length; ++i) {
         DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], 1,
-            ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+            false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a
session
 
         tezClient.waitTillReady();
         System.out.println("Running dag number " + i);

http://git-wip-us.apache.org/repos/asf/tez/blob/7311d7d4/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index d28b9e3..adf27e8 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -98,12 +98,14 @@ public class TestTezJobs {
 
   private static Configuration conf = new Configuration();
   private static FileSystem remoteFs;
+  private static FileSystem localFs;
 
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestTezJobs.class.getName()
       + "-tmpDir";
 
   @BeforeClass
   public static void setup() throws IOException {
+    localFs = FileSystem.getLocal(conf);
     try {
       conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
       dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
@@ -139,7 +141,7 @@ public class TestTezJobs {
   @Test(timeout = 60000)
   public void testHashJoinExample() throws Exception {
     HashJoinExample hashJoinExample = new HashJoinExample();
-    hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+    hashJoinExample.setConf(mrrTezCluster.getConfig());
     Path stagingDirPath = new Path("/tmp/tez-staging-dir");
     Path inPath1 = new Path("/tmp/hashJoin/inPath1");
     Path inPath2 = new Path("/tmp/hashJoin/inPath2");
@@ -193,6 +195,64 @@ public class TestTezJobs {
   }
 
   @Test(timeout = 60000)
+  public void testHashJoinExampleDisableSplitGrouping() throws Exception {
+    HashJoinExample hashJoinExample = new HashJoinExample();
+    hashJoinExample.setConf(conf);
+    Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
+    Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath1");
+    Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath2");
+    Path outPath = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/outPath");
+    localFs.delete(outPath, true);
+    localFs.mkdirs(inPath1);
+    localFs.mkdirs(inPath2);
+    localFs.mkdirs(stagingDirPath);
+
+    Set<String> expectedResult = new HashSet<String>();
+
+    FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file"));
+    FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file"));
+    BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+    BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+    for (int i = 0; i < 20; i++) {
+      String term = "term" + i;
+      writer1.write(term);
+      writer1.newLine();
+      if (i % 2 == 0) {
+        writer2.write(term);
+        writer2.newLine();
+        expectedResult.add(term);
+      }
+    }
+    writer1.close();
+    writer2.close();
+    out1.close();
+    out2.close();
+
+    String[] args = new String[] {
+        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+        "-local", "-disableSplitGrouping",
+        inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+    assertEquals(0, hashJoinExample.run(args));
+
+    FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter() {
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    assertEquals(1, statuses.length);
+    FSDataInputStream inStream = localFs.open(statuses[0].getPath());
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      assertTrue(expectedResult.remove(line));
+    }
+    reader.close();
+    inStream.close();
+    assertEquals(0, expectedResult.size());
+  }
+
+  @Test(timeout = 60000)
   public void testSortMergeJoinExample() throws Exception {
     SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
     sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
@@ -248,6 +308,64 @@ public class TestTezJobs {
     assertEquals(0, expectedResult.size());
   }
 
+  @Test(timeout = 60000)
+  public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
+    SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
+    sortMergeJoinExample.setConf(conf);
+    Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
+    Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1");
+    Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2");
+    Path outPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/outPath");
+    localFs.delete(outPath, true);
+    localFs.mkdirs(inPath1);
+    localFs.mkdirs(inPath2);
+    localFs.mkdirs(stagingDirPath);
+
+    Set<String> expectedResult = new HashSet<String>();
+
+    FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file"));
+    FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file"));
+    BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+    BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+    for (int i = 0; i < 20; i++) {
+      String term = "term" + i;
+      writer1.write(term);
+      writer1.newLine();
+      if (i % 2 == 0) {
+        writer2.write(term);
+        writer2.newLine();
+        expectedResult.add(term);
+      }
+    }
+    writer1.close();
+    writer2.close();
+    out1.close();
+    out2.close();
+
+    String[] args = new String[] {
+        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+        "-local","-disableSplitGrouping",
+        inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+    assertEquals(0, sortMergeJoinExample.run(args));
+
+    FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter() {
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    assertEquals(1, statuses.length);
+    FSDataInputStream inStream = localFs.open(statuses[0].getPath());
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      assertTrue(expectedResult.remove(line));
+    }
+    reader.close();
+    inStream.close();
+    assertEquals(0, expectedResult.size());
+  }
+
   /**
    * test whole {@link HashJoinExample} pipeline as following: <br>
    * {@link JoinDataGen} -> {@link HashJoinExample} -> {@link JoinValidate}
@@ -344,15 +462,15 @@ public class TestTezJobs {
     }
   }
 
-  private void generateOrderedWordCountInput(Path inputDir) throws IOException {
+  private void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException
{
     Path dataPath1 = new Path(inputDir, "inPath1");
     Path dataPath2 = new Path(inputDir, "inPath2");
 
     FSDataOutputStream f1 = null;
     FSDataOutputStream f2 = null;
     try {
-      f1 = remoteFs.create(dataPath1);
-      f2 = remoteFs.create(dataPath2);
+      f1 = fs.create(dataPath1);
+      f2 = fs.create(dataPath2);
 
       final String prefix = "a";
       for (int i = 1; i <= 10; ++i) {
@@ -377,8 +495,8 @@ public class TestTezJobs {
     }
   }
 
-  private void verifyOrderedWordCountOutput(Path resultFile) throws IOException {
-    FSDataInputStream inputStream = remoteFs.open(resultFile);
+  private void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException
{
+    FSDataInputStream inputStream = fs.open(resultFile);
     final String prefix = "a";
     int currentCounter = 10;
 
@@ -402,8 +520,8 @@ public class TestTezJobs {
     Assert.assertEquals(0, currentCounter);
   }
   
-  private void verifyOutput(Path outputDir) throws IOException {
-    FileStatus[] fileStatuses = remoteFs.listStatus(outputDir);
+  private void verifyOutput(Path outputDir, FileSystem fs) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(outputDir);
     Path resultFile = null;
     boolean foundResult = false;
     boolean foundSuccessFile = false;
@@ -428,7 +546,7 @@ public class TestTezJobs {
     assertTrue(foundResult);
     assertTrue(resultFile != null);
     assertTrue(foundSuccessFile);
-    verifyOrderedWordCountOutput(resultFile);
+    verifyOrderedWordCountOutput(resultFile, fs);
   }
   
   @Test(timeout = 60000)
@@ -438,7 +556,7 @@ public class TestTezJobs {
     Path stagingDirPath = new Path("/tmp/owc-staging-dir");
     remoteFs.mkdirs(inputDir);
     remoteFs.mkdirs(stagingDirPath);
-    generateOrderedWordCountInput(inputDir);
+    generateOrderedWordCountInput(inputDir, remoteFs);
 
     String outputDirStr = "/tmp/owc-output/";
     Path outputDir = new Path(outputDirStr);
@@ -451,7 +569,7 @@ public class TestTezJobs {
 
       OrderedWordCount job = new OrderedWordCount();
       Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{inputDirStr,
outputDirStr, "2"}, null)==0);
-      verifyOutput(outputDir);
+      verifyOutput(outputDir, remoteFs);
 
     } finally {
       remoteFs.delete(stagingDirPath, true);
@@ -463,6 +581,39 @@ public class TestTezJobs {
   }
   
   @Test(timeout = 60000)
+  public void testOrderedWordCountDisableSplitGrouping() throws Exception {
+    String inputDirStr = TEST_ROOT_DIR + "/tmp/owc-input/";
+    Path inputDir = new Path(inputDirStr);
+    Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/owc-staging-dir");
+    localFs.mkdirs(inputDir);
+    localFs.mkdirs(stagingDirPath);
+    generateOrderedWordCountInput(inputDir, localFs);
+
+    String outputDirStr = TEST_ROOT_DIR + "/tmp/owc-output/";
+    localFs.delete(new Path(outputDirStr), true);
+    Path outputDir = new Path(outputDirStr);
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    TezClient tezSession = null;
+
+    try {
+
+      OrderedWordCount job = new OrderedWordCount();
+      Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{"-local",
"-disableSplitGrouping",
+          inputDirStr, outputDirStr, "2"}, null)==0);
+      verifyOutput(outputDir, localFs);
+
+    } finally {
+      localFs.delete(stagingDirPath, true);
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+  }
+
+  @Test(timeout = 60000)
   public void testSimpleSessionExample() throws Exception {
     Path stagingDirPath = new Path("/tmp/owc-staging-dir");
     remoteFs.mkdirs(stagingDirPath);
@@ -476,7 +627,7 @@ public class TestTezJobs {
       inputPaths[i] = inputDirStr;
       Path inputDir = new Path(inputDirStr);
       remoteFs.mkdirs(inputDir);
-      generateOrderedWordCountInput(inputDir);
+      generateOrderedWordCountInput(inputDir, remoteFs);
       String outputDirStr = "/tmp/owc-output-" + i + "/"; 
       outputPaths[i] = outputDirStr;
       Path outputDir = new Path(outputDirStr);
@@ -504,7 +655,7 @@ public class TestTezJobs {
               StringUtils.join(",", outputPaths), "2" }, null) == 0);
 
       for (int i=0; i<numIterations; ++i) {
-        verifyOutput(outputDirs[i]);
+        verifyOutput(outputDirs[i], remoteFs);
       }
       
       apps = yarnClient.getApplications();


Mime
View raw message