tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [33/50] [abbrv] git commit: TEZ-118. Create a MiniTezCluster for allowing of unit tests.
Date Tue, 04 Jun 2013 05:33:35 GMT
TEZ-118. Create a MiniTezCluster for allowing of unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2340884c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2340884c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2340884c

Branch: refs/heads/master
Commit: 2340884c3ce85364ee1892909a58b82f532560db
Parents: 82ee45e
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed May 29 13:32:47 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed May 29 13:32:47 2013 -0700

----------------------------------------------------------------------
 pom.xml                                            |   26 +
 .../main/java/org/apache/tez/dag/api/Vertex.java   |   41 +-
 tez-dag/pom.xml                                    |   22 -
 tez-dag/src/test/resources/krb5.conf               |   28 -
 .../tez/mapreduce/examples/ExampleDriver.java      |   16 +-
 .../apache/tez/mapreduce/examples/MRRSleepJob.java |  460 +++++++++++++++
 tez-mapreduce-tests/pom.xml                        |  100 ++++
 .../apache/tez/mapreduce/MiniMRRTezCluster.java    |  151 +++++
 .../java/org/apache/tez/mapreduce/TestMRRJobs.java |  344 +++++++++++
 tez-mapreduce/pom.xml                              |   12 +
 .../java/org/apache/tez/mapreduce/YARNRunner.java  |  108 ++--
 11 files changed, 1172 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7fa336a..0e4f8e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tez</groupId>
+        <artifactId>tez-mapreduce-examples</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-mapreduce-tests</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
         <artifactId>tez-dag</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -191,6 +201,19 @@
         <version>${hadoop.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <scope>test</scope>
+        <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <scope>test</scope>
+        <type>test-jar</type>
+        <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-all</artifactId>
         <version>1.9.5</version>
@@ -264,6 +287,7 @@
     <module>tez-yarn-client</module>
     <module>tez-mapreduce</module>
     <module>tez-mapreduce-examples</module>
+    <module>tez-mapreduce-tests</module>
     <module>tez-dag</module>
     <module>tez-dist</module>
   </modules>
@@ -299,7 +323,9 @@
             <forkMode>always</forkMode>
             <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
             <argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
+            <redirectTestOutputToFile>true</redirectTestOutputToFile>
             <environmentVariables>
+              <JAVA_HOME>${java.home}</JAVA_HOME>
               <MALLOC_ARENA_MAX>4</MALLOC_ARENA_MAX>
             </environmentVariables>
             <systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 6bea664..cbcac16 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -30,30 +30,30 @@ public class Vertex { // FIXME rename to Task
 
   private final String vertexName;
   private final String processorName;
-  
+
   private final int parallelism;
   private VertexLocationHint taskLocationsHint;
   private Resource taskResource;
   private Map<String, LocalResource> taskLocalResources;
   private Map<String, String> taskEnvironment;
-  
+
   private final List<Vertex> inputVertices = new ArrayList<Vertex>();
   private final List<Vertex> outputVertices = new ArrayList<Vertex>();
   private final List<String> inputEdgeIds = new ArrayList<String>();
   private final List<String> outputEdgeIds = new ArrayList<String>();
   private String javaOpts = "";
-  
-  
+
+
   public Vertex(String vertexName, String processorName, int parallelism) {
     this.vertexName = vertexName;
     this.processorName = processorName;
     this.parallelism = parallelism;
   }
-  
+
   public String getVertexName() { // FIXME rename to getName()
     return vertexName;
   }
-  
+
   public String getProcessorName() {
     return processorName;
   }
@@ -61,16 +61,19 @@ public class Vertex { // FIXME rename to Task
   public int getParallelism() {
     return parallelism;
   }
-  
+
   public void setTaskResource(Resource resource) {
     this.taskResource = resource;
   }
-  
+
   public Resource getTaskResource() {
     return taskResource;
   }
-  
+
   public void setTaskLocationsHint(TaskLocationHint[] locations) {
+    if (locations == null) {
+      return;
+    }
     assert locations.length == parallelism;
     taskLocationsHint = new VertexLocationHint(parallelism, locations);
   }
@@ -79,31 +82,31 @@ public class Vertex { // FIXME rename to Task
   VertexLocationHint getTaskLocationsHint() {
     return taskLocationsHint;
   }
-  
+
   public void setTaskLocalResources(Map<String, LocalResource> localResources) {
     this.taskLocalResources = localResources;
   }
-  
+
   public Map<String, LocalResource> getTaskLocalResources() {
     return taskLocalResources;
   }
-  
+
   public void setTaskEnvironment(Map<String, String> environment) {
     this.taskEnvironment = environment;
   }
-  
+
   public Map<String, String> getTaskEnvironment() {
     return taskEnvironment;
   }
-  
+
   public void setJavaOpts(String javaOpts){
      this. javaOpts = javaOpts;
   }
-  
+
   public String getJavaOpts(){
 	  return javaOpts;
   }
-  
+
   @Override
   public String toString() {
     return "[" + vertexName + " : " + processorName + "]";
@@ -118,15 +121,15 @@ public class Vertex { // FIXME rename to Task
     outputVertices.add(outputVertex);
     outputEdgeIds.add(edgeId);
   }
-  
+
   List<Vertex> getInputVertices() {
     return inputVertices;
   }
-  
+
   List<Vertex> getOutputVertices() {
     return outputVertices;
   }
-  
+
   List<String> getInputEdgeIds() {
     return inputEdgeIds;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index fc03bea..34194db 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -22,8 +22,6 @@
     <version>0.2.0-SNAPSHOT</version>
   </parent>
   <properties>
-    <maven.test.redirectTestOutputToFile>true
-    </maven.test.redirectTestOutputToFile>
     <tez.component>tez-dag</tez.component>
   </properties>
   <artifactId>tez-dag</artifactId>
@@ -102,26 +100,6 @@
   </dependencies>
 
   <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-surefire-plugin</artifactId>
-          <configuration>
-            <forkMode>always</forkMode>
-            <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
-            <argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
-            <environmentVariables>
-              <MALLOC_ARENA_MAX>4</MALLOC_ARENA_MAX>
-            </environmentVariables>
-            <systemPropertyVariables>
-              <java.security.krb5.conf>${basedir}/src/test/resources/krb5.conf
-              </java.security.krb5.conf>
-            </systemPropertyVariables>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
     <plugins>
       <plugin>
         <groupId>org.apache.rat</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-dag/src/test/resources/krb5.conf
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/resources/krb5.conf b/tez-dag/src/test/resources/krb5.conf
deleted file mode 100644
index 121ac6d..0000000
--- a/tez-dag/src/test/resources/krb5.conf
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# 
-[libdefaults]
-	default_realm = APACHE.ORG
-	udp_preference_limit = 1
-	extra_addresses = 127.0.0.1
-[realms]
-	APACHE.ORG = {
-		admin_server = localhost:88
-		kdc = localhost:88
-	}
-[domain_realm]
-	localhost = APACHE.ORG

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 94ba79f..468e0ac 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -24,23 +24,23 @@ import org.apache.tez.mapreduce.examples.terasort.TeraSort;
 import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
 
 /**
- * A description of an example program based on its class and a 
+ * A description of an example program based on its class and a
  * human-readable description.
  */
 public class ExampleDriver {
-  
+
   public static void main(String argv[]){
     int exitCode = -1;
     ProgramDriver pgd = new ProgramDriver();
     try {
-      pgd.addClass("wordcount", WordCount.class, 
+      pgd.addClass("wordcount", WordCount.class,
           "A map/reduce program that counts the words in the input files.");
-      pgd.addClass("wordcountmrrtest", WordCountMRRTest.class, 
+      pgd.addClass("wordcountmrrtest", WordCountMRRTest.class,
           "A map/reduce program that counts the words in the input files."
           + " Map splits on spaces. First reduce splits on \".\"");
-      pgd.addClass("randomwriter", RandomWriter.class, 
+      pgd.addClass("randomwriter", RandomWriter.class,
           "A map/reduce program that writes 10GB of random data per node.");
-      pgd.addClass("randomtextwriter", RandomTextWriter.class, 
+      pgd.addClass("randomtextwriter", RandomTextWriter.class,
       "A map/reduce program that writes 10GB of random textual data per node.");
       pgd.addClass("sort", Sort.class,
           "A map/reduce program that sorts the data written by the random"
@@ -61,12 +61,14 @@ public class ExampleDriver {
           + " containing employee_name department name per line of input"
           + " and generates count of employees per department and"
           + " sorted on employee count");
+      pgd.addClass("mrrsleep", MRRSleepJob.class,
+          "MRR Sleep Job");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){
       e.printStackTrace();
     }
-    
+
     System.exit(exitCode);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
new file mode 100644
index 0000000..6115c6f
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -0,0 +1,460 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period
+ * of time in mapper and reducer. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class MRRSleepJob extends Configured implements Tool {
+
+  public static Log LOG = LogFactory.getLog(MRRSleepJob.class);
+
+  public static String MAP_SLEEP_COUNT = "mrr.sleepjob.map.sleep.count";
+  public static String REDUCE_SLEEP_COUNT =
+    "mrr.sleepjob.reduce.sleep.count";
+  public static String MAP_SLEEP_TIME = "mrr.sleepjob.map.sleep.time";
+  public static String REDUCE_SLEEP_TIME =
+    "mrr.sleepjob.reduce.sleep.time";
+  public static String IREDUCE_SLEEP_COUNT =
+      "mrr.sleepjob.ireduce.sleep.count";
+  public static String IREDUCE_SLEEP_TIME =
+      "mrr.sleepjob.ireduce.sleep.time";
+  public static String IREDUCE_STAGES_COUNT =
+      "mrr.sleepjob.ireduces.stages.count";
+
+  // Flags to inject failures
+  public static String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
+  public static String MAP_FATAL_ERROR = "mrr.sleepjob.map.fatal.error";
+  public static String MAP_ERROR_TASK_IDS = "mrr.sleepjob.map.error.task.ids";
+
+  public static class MRRSleepJobPartitioner extends
+      Partitioner<IntWritable, IntWritable> {
+    public int getPartition(IntWritable k, IntWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat
+      extends InputFormat<IntWritable,IntWritable> {
+
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt(MRJobConfig.NUM_MAPS, 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      String vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+      boolean isIntermediateReduce =
+          MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) != -1;
+
+      final int count =
+          (isIntermediateReduce)?
+              conf.getInt(IREDUCE_SLEEP_COUNT, 1) :
+              conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+
+      int totalIReduces = MultiStageMRConfigUtil.getNumIntermediateStages(conf);
+      boolean finalIReduce = totalIReduces ==
+          (MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) + 1);
+
+      final int redcount = finalIReduce?
+          conf.getInt(REDUCE_SLEEP_COUNT, 1) :
+            conf.getInt(IREDUCE_SLEEP_COUNT, 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          if (count == 0) {
+            return false;
+          }
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return count == 0 ? 100 : records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+    private boolean throwError = false;
+    private boolean throwFatal = false;
+    private boolean finalAttempt = false;
+
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration = mapSleepCount == 0 ? 0 :
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+
+      TaskAttemptID taId = context.getTaskAttemptID();
+      String[] taskIds = conf.getStrings(MAP_ERROR_TASK_IDS);
+      if (taId.getId()+1 >= context.getMaxMapAttempts()) {
+        finalAttempt = true;
+      }
+      boolean found = false;
+      if (taskIds != null) {
+        if (taskIds.length == 1 && taskIds[0].equals("*")) {
+          found = true;
+        }
+        if (!found) {
+          for (String taskId : taskIds) {
+            if (Integer.valueOf(taskId).intValue() ==
+                taId.getTaskID().getId()) {
+              found = true;
+              break;
+            }
+          }
+        }
+      }
+      if (found) {
+        if (!finalAttempt) {
+          throwError = conf.getBoolean(MAP_THROW_ERROR, false);
+        }
+        throwFatal = conf.getBoolean(MAP_FATAL_ERROR, false);
+      }
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records.
+      try {
+        LOG.info("Sleeping in InitialMap"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", mapSleepDuration=" + mapSleepDuration
+            + ", mapSleepCount=" + mapSleepCount
+            + ", sleepLeft="
+            + (mapSleepDuration * (mapSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+        if (throwError || throwFatal) {
+          throw new IOException("Throwing a simulated error from map");
+        }
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), new IntWritable(1));
+      }
+    }
+  }
+
+  public static class ISleepReducer
+  extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    private long iReduceSleepDuration = 100;
+    private int iReduceSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.iReduceSleepCount =
+          conf.getInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+      this.iReduceSleepDuration = iReduceSleepCount == 0 ? 0 :
+        conf.getLong(IREDUCE_SLEEP_TIME , 100) / iReduceSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+    }
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+        Context context)
+            throws IOException, InterruptedException {
+      try {
+        LOG.info("Sleeping in IntermediateReduce"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", iReduceSleepDuration=" + iReduceSleepDuration
+            + ", iReduceSleepCount=" + iReduceSleepCount
+            + ", sleepLeft="
+            + (iReduceSleepDuration * (iReduceSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+          (iReduceSleepDuration * (iReduceSleepCount - count)) + ") ms left");
+        Thread.sleep(iReduceSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (IntWritable value : values) {
+        for (int i = 0; i < value.get(); ++i) {
+          context.write(new IntWritable(k + i), new IntWritable(1));
+        }
+      }
+    }
+  }
+
+  public static class SleepReducer
+      extends Reducer<IntWritable, IntWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
+        conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+    }
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        LOG.info("Sleeping in FinalReduce"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", reduceSleepDuration=" + reduceSleepDuration
+            + ", reduceSleepCount=" + reduceSleepCount
+            + ", sleepLeft="
+            + (reduceSleepDuration * (reduceSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new MRRSleepJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
+      int numIReducer, long mapSleepTime, int mapSleepCount,
+      long reduceSleepTime, int reduceSleepCount,
+      long iReduceSleepTime, int iReduceSleepCount)
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+    conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+
+    // Configure intermediate reduces
+    conf.setInt(
+        org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
+        iReduceStagesCount);
+    LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
+
+    for (int i = 0; i < iReduceStagesCount; ++i) {
+      // Set reducer class for intermediate reduce
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+          "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
+      // Set reducer output key class
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+          "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
+      // Set reducer output value class
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+          "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
+      conf.setInt(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+          "mapreduce.job.reduces"), numIReducer);
+    }
+
+    Job job = Job.getInstance(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(MRRSleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(MRRSleepJobPartitioner.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+
+
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("MRRSleepJob [-m numMapper] [-r numReducer]" +
+          " [-ir numIntermediateReducer]" +
+          " [-irs numIntermediateReducerStages]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-irt intermediateReduceSleepTime]" +
+          " [-recordt recordSleepTime (msec)]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1, numIReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100,
+        iReduceSleepTime=1;
+    int mapSleepCount = 1, reduceSleepCount = 1, iReduceSleepCount = 1;
+    int iReduceStagesCount = 1;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-ir")) {
+        numIReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-irt")) {
+        iReduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-irs")) {
+        iReduceStagesCount = Integer.parseInt(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+    }
+
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime));
+    Job job = createJob(numMapper, numReducer, iReduceStagesCount, numIReducer,
+        mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
+        iReduceSleepTime, iReduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/pom.xml b/tez-mapreduce-tests/pom.xml
new file mode 100644
index 0000000..0fb1024
--- /dev/null
+++ b/tez-mapreduce-tests/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-mapreduce-tests</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce-examples</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
new file mode 100644
index 0000000..41a1cf3
--- /dev/null
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniMRRTezCluster extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniMRRTezCluster.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniMRRTezCluster(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniMRRTezCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniMRRTezCluster(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new YarnException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+  }
+
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
new file mode 100644
index 0000000..520a1ac
--- /dev/null
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -0,0 +1,344 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
+
+  protected static MiniMRRTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static FileSystem remoteFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static String TEST_ROOT_DIR = "target"
+      + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
+
+  private static Path TEST_ROOT_DIR_PATH =
+      localFs.makeQualified(new Path(TEST_ROOT_DIR));
+  static Path APP_JAR = new Path(TEST_ROOT_DIR_PATH, "MRAppJar.jar");
+  static Path YARN_SITE_XML = new Path(TEST_ROOT_DIR_PATH, "yarn-site.xml");
+  private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
+      TestMRRJobs.class.getSimpleName();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniMRRTezCluster(TestMRRJobs.class.getName(), 1,
+          1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+    localFs.copyFromLocalFile(new Path(MiniMRRTezCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+    localFs.copyFromLocalFile(mrrTezCluster.getConfigFilePath(), YARN_SITE_XML);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  @Test (timeout = 300000)
+  public void testMRRSleepJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    LOG.info("\n\n\nStarting testMRRSleepJob().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.addFileToClassPath(YARN_SITE_XML);
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO use dag client to test counters and task progress?
+    // what about completed jobs?
+
+  }
+
+  @Test (timeout = 60000)
+  public void testRandomWriter() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testRandomWriter().");
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+    Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
+    Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setSpeculativeExecution(false);
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.addFileToClassPath(YARN_SITE_XML);
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // Make sure there are three files in the output-dir
+
+    RemoteIterator<FileStatus> iterator =
+        FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
+            outputDir);
+    int count = 0;
+    while (iterator.hasNext()) {
+      FileStatus file = iterator.next();
+      if (!file.getPath().getName()
+          .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+        count++;
+      }
+    }
+    Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+  }
+
+
+  @Test (timeout = 60000)
+  public void testFailingJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingMapper().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.addFileToClassPath(YARN_SITE_XML);
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertFalse(succeeded);
+    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+  @Test (timeout = 300000)
+  public void testFailingAttempt() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingMapper().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.addFileToClassPath(YARN_SITE_XML);
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(3); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+
+  /*
+  //@Test (timeout = 60000)
+  public void testMRRSleepJobWithSecurityOn() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      return;
+    }
+
+    mrrTezCluster.getConfig().set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
+        "rm/sightbusy-lx@LOCALHOST");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
+        "nm/sightbusy-lx@LOCALHOST");
+
+    UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
+
+    // Keep it in here instead of after RM/NM as multiple user logins happen in
+    // the same JVM.
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    LOG.info("User name is " + user.getUserName());
+    for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+      LOG.info("Token is " + str.encodeToUrlString());
+    }
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        MRRSleepJob sleepJob = new MRRSleepJob();
+        sleepJob.setConf(mrrTezCluster.getConfig());
+        Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+        // //Job with reduces
+        // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+        job.submit();
+        String trackingUrl = job.getTrackingURL();
+        String jobId = job.getJobID().toString();
+        job.waitForCompletion(true);
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+        Assert.assertTrue("Tracking URL was " + trackingUrl +
+                          " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+        return null;
+      }
+    });
+
+    // TODO later:  add explicit "isUber()" checks of some sort
+  }
+  */
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/pom.xml b/tez-mapreduce/pom.xml
index e785f6b..e32b098 100644
--- a/tez-mapreduce/pom.xml
+++ b/tez-mapreduce/pom.xml
@@ -54,6 +54,18 @@
       <artifactId>hadoop-mapreduce-client-common</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2340884c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index a79a4f9..801ce8a 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -126,6 +126,7 @@ public class YARNRunner implements ClientProtocol {
 
   final public static FsPermission DAG_FILE_PERMISSION =
       FsPermission.createImmutable((short) 0644);
+  final public static int UTF8_CHUNK_SIZE = 16 * 1024;
 
   /**
    * Yarn runner incapsulates the client interface of
@@ -395,7 +396,7 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
-  
+
   private void setupMapReduceEnv(Configuration jobConf,
       Map<String, String> environment, boolean isMap) throws IOException {
 
@@ -470,7 +471,7 @@ public class YARNRunner implements ClientProtocol {
           + "=" + entry.getValue());
     }
   }
-  
+
   private Vertex configureIntermediateReduceStage(FileSystem fs, JobID jobId,
       Configuration jobConf, String jobSubmitDir, Credentials ts,
       Map<String, LocalResource> jobLocalResources, int iReduceIndex)
@@ -482,7 +483,7 @@ public class YARNRunner implements ClientProtocol {
     Vertex vertex = new Vertex(
         MultiStageMRConfigUtil.getIntermediateStageVertexName(stageNum),
         "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor", numTasks);
-    
+
     Map<String, String> reduceEnv = new HashMap<String, String>();
     setupMapReduceEnv(conf, reduceEnv, false);
 
@@ -501,7 +502,7 @@ public class YARNRunner implements ClientProtocol {
     vertex.setTaskResource(reduceResource);
 
     vertex.setJavaOpts(getReduceJavaOpts(conf));
-    
+
     return vertex;
   }
 
@@ -518,7 +519,7 @@ public class YARNRunner implements ClientProtocol {
       vertices[i] = configureIntermediateReduceStage(fs, jobId, jobConf, jobSubmitDir, ts,
           jobLocalResources, i);
       dag.addVertex(vertices[i]);
-      
+
       LOG.info("XXXX Adding intermediate vertex to DAG"
           + ", vertexName=" + vertices[i].getVertexName()
           + ", processor=" + vertices[i].getProcessorName()
@@ -550,7 +551,7 @@ public class YARNRunner implements ClientProtocol {
     String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
     Vertex mapVertex = new Vertex(
         MultiStageMRConfigUtil.getInitialMapVertexName(),
-        mapProcessor, numMaps); 
+        mapProcessor, numMaps);
 
     // FIXME set up map environment
     Map<String, String> mapEnv = new HashMap<String, String>();
@@ -575,7 +576,7 @@ public class YARNRunner implements ClientProtocol {
     mapVertex.setTaskResource(mapResource);
 
     mapVertex.setJavaOpts(getMapJavaOpts(jobConf));
-    
+
     LOG.info("XXXX Adding map vertex to DAG"
         + ", vertexName=" + mapVertex.getVertexName()
         + ", processor=" + mapVertex.getProcessorName()
@@ -618,7 +619,7 @@ public class YARNRunner implements ClientProtocol {
       reduceVertex.setTaskResource(reduceResource);
 
       reduceVertex.setJavaOpts(getReduceJavaOpts(jobConf));
-      
+
       LOG.info("XXXX Adding reduce vertex to DAG"
           + ", vertexName=" + reduceVertex.getVertexName()
           + ", processor=" + reduceVertex.getProcessorName()
@@ -677,11 +678,11 @@ public class YARNRunner implements ClientProtocol {
         dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey()));
       }
     }
-    
+
     String jobName = mrConf.get(MRJobConfig.JOB_NAME);
     if(jobName != null) {
       dag.setName(jobName);
-    }    
+    }
   }
 
   private ApplicationSubmissionContext createApplicationSubmissionContext(
@@ -712,13 +713,13 @@ public class YARNRunner implements ClientProtocol {
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
 //[Debug AppMaster] Current simplest way to attach debugger to AppMaster
-// Uncomment the following, then launch a regular job, eg 
-// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2                  
+// Uncomment the following, then launch a regular job, eg
+// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2
 //     LOG.error(" !!!!!!!!!");
 //     LOG.error(" !!!!!!!!! Launching AppMaster in debug/suspend mode.  Attach to port 8002");
 //     LOG.error(" !!!!!!!!!");
 //     vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
-    
+
     // TODO -Dtez.root.logger??
     String amLogLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL,
         MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
@@ -782,8 +783,10 @@ public class YARNRunner implements ClientProtocol {
     DAGPlan dagPB = dag.createDag();
     FSDataOutputStream dagPBOutBinaryStream = null;
     FSDataOutputStream dagPBOutTextStream = null;
-    Path binaryPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_BINARY);
-    Path textPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_TEXT);
+    Path binaryPath =  new Path(jobSubmitDir,
+        TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+    Path textPath =  new Path(jobSubmitDir,
+        TezConfiguration.DAG_AM_PLAN_PB_TEXT);
     try {
       //binary output
       dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
@@ -793,7 +796,22 @@ public class YARNRunner implements ClientProtocol {
       // text / human-readable output
       dagPBOutTextStream = FileSystem.create(fs, textPath,
           new FsPermission(DAG_FILE_PERMISSION));
-      dagPBOutTextStream.writeUTF(dagPB.toString());
+      String dagPBStr = dagPB.toString();
+      int dagPBStrLen = dagPBStr.length();
+      if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
+        dagPBOutTextStream.writeUTF(dagPBStr);
+      } else {
+        int startIndex = 0;
+        while (startIndex < dagPBStrLen) {
+          int endIndex = startIndex + UTF8_CHUNK_SIZE;
+          if (endIndex > dagPBStrLen) {
+            endIndex = dagPBStrLen;
+          }
+          dagPBOutTextStream.writeUTF(
+              dagPBStr.substring(startIndex, endIndex));
+          startIndex += UTF8_CHUNK_SIZE;
+        }
+      }
     } finally {
       if(dagPBOutBinaryStream != null){
         dagPBOutBinaryStream.close();
@@ -823,7 +841,7 @@ public class YARNRunner implements ClientProtocol {
     // Set up the ApplicationSubmissionContext
     ApplicationSubmissionContext appContext = Records
         .newRecord(ApplicationSubmissionContext.class);
-    
+
     appContext.setApplicationId(applicationId);                // ApplicationId
     appContext.setResource(capability);                        // resource
     appContext.setQueue(                                       // Queue name
@@ -870,12 +888,12 @@ public class YARNRunner implements ClientProtocol {
     Configuration tezJobConf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
 
     // This will replace job.xml in the staging dir.
-    writeTezConf(jobSubmitDir, fs, tezJobConf);    
+    writeTezConf(jobSubmitDir, fs, tezJobConf);
 
     // FIXME set up job resources
     Map<String, LocalResource> jobLocalResources =
         createJobLocalResources(tezJobConf, jobSubmitDir);
-    
+
     // FIXME createDAG should take the tezConf as a parameter, instead of using
     // MR keys.
     DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts,
@@ -898,36 +916,6 @@ public class YARNRunner implements ClientProtocol {
           || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
         throw new IOException("Failed to run job : " + diagnostics);
       }
-
-      if (LOG.isDebugEnabled()) {
-        while (true) {
-          appMasterReport = resMgrDelegate.getApplicationReport(applicationId);
-          diagnostics = (appMasterReport == null ? "application report is null"
-              : appMasterReport.getDiagnostics());
-          if (appMasterReport == null) {
-            throw new IOException("Failed to run job : " + diagnostics);
-          }
-          YarnApplicationState state = appMasterReport
-              .getYarnApplicationState();
-          if (state.equals(YarnApplicationState.FAILED)
-              || state.equals(YarnApplicationState.FINISHED)
-              || state.equals(YarnApplicationState.KILLED)) {
-            LOG.info("Job completed" + ", finalStatus="
-                + appMasterReport.getFinalApplicationStatus() + ", finalState="
-                + appMasterReport.getYarnApplicationState() + ", diagnostics="
-                + diagnostics);
-            break;
-          } else {
-            LOG.info("Job in progress" + ", finalStatus="
-                + appMasterReport.getFinalApplicationStatus() + ", finalState="
-                + appMasterReport.getYarnApplicationState());
-          }
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-          }
-        }
-      }
     } catch (YarnRemoteException e) {
       throw new IOException(e);
     }
@@ -1089,7 +1077,7 @@ public class YARNRunner implements ClientProtocol {
                envConf + " config settings.");
     }
   }
-  
+
   @SuppressWarnings("deprecation")
   private String getMapJavaOpts(Configuration jobConf) {
     // follows pattern from YARN MapReduceChildJVM.java
@@ -1097,9 +1085,9 @@ public class YARNRunner implements ClientProtocol {
     adminOpts = jobConf.get(
         MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
         MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    
+
     String userOpts = "";
-    userOpts = 
+    userOpts =
         jobConf.get(
             MRJobConfig.MAP_JAVA_OPTS, // same as JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS
                 jobConf.get(
@@ -1109,23 +1097,23 @@ public class YARNRunner implements ClientProtocol {
     return adminOpts.trim() + " " + userOpts.trim() + " "
     + getLog4jCmdLineProperties(jobConf, true);
   }
-  
+
   @SuppressWarnings("deprecation")
   private String getReduceJavaOpts(Configuration jobConf) {
-    // follows pattern from YARN MapReduceChildJVM.java 
+    // follows pattern from YARN MapReduceChildJVM.java
     String adminOpts = "";
     adminOpts = jobConf.get(
         MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
         MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    
+
     String userOpts = "";
-    userOpts = 
+    userOpts =
         jobConf.get(
-            MRJobConfig.REDUCE_JAVA_OPTS, // same as JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS 
+            MRJobConfig.REDUCE_JAVA_OPTS, // same as JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS
                 jobConf.get(
                     JobConf.MAPRED_TASK_JAVA_OPTS,
                     JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
-    
+
     return adminOpts.trim() + " " + userOpts.trim() + " "
         + getLog4jCmdLineProperties(jobConf, false);
   }
@@ -1144,7 +1132,7 @@ public class YARNRunner implements ClientProtocol {
   /**
    * Add the JVM system properties necessary to configure
    * {@link ContainerLogAppender}.
-   * 
+   *
    * @param logLevel
    *          the desired log level (eg INFO/WARN/DEBUG)
    * @param vargs
@@ -1163,4 +1151,4 @@ public class YARNRunner implements ClientProtocol {
     vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
   }
 
-}
\ No newline at end of file
+}


Mime
View raw message