tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-205. Create helper apis to construct container capability resource for tasks and local resources for input splits. (hitesh)
Date Wed, 12 Jun 2013 17:38:17 GMT
Updated Branches:
  refs/heads/master 2d8c9b6a9 -> 57735b057


TEZ-205. Create helper apis to construct container capability resource for tasks and local
resources for input splits. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/57735b05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/57735b05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/57735b05

Branch: refs/heads/master
Commit: 57735b05775c51a9900b1e23825cc158b81d8876
Parents: 2d8c9b6
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Jun 12 10:24:04 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Jun 12 10:38:09 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 116 ++++++++++++++++++-
 .../tez/mapreduce/hadoop/TestMRHelpers.java     |  85 ++++++++++++--
 2 files changed, 188 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/57735b05/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 686d7d4..8d5acbd 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Vector;
 
 import org.apache.commons.logging.Log;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -44,7 +46,12 @@ import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 
 import com.google.common.base.Preconditions;
@@ -53,6 +60,10 @@ public class MRHelpers {
 
   private static final Log LOG = LogFactory.getLog(MRHelpers.class);
 
+  static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
+  static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
+      "job.splitmetainfo";
+
   /**
    * Comparator for org.apache.hadoop.mapreduce.InputSplit
    */
@@ -198,7 +209,7 @@ public class MRHelpers {
    *
    * @param inputSplitsDir Directory in which the splits file and meta info file
    * will be generated. job.split and job.splitmetainfo files in this directory
-   * will be overwritten.
+   * will be overwritten. Should be a fully-qualified path.
    *
    * @return InputSplitInfo containing the split files' information and the
    * number of splits generated to be used to determining parallelism of
@@ -269,6 +280,15 @@ public class MRHelpers {
     vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
   }
 
+  /**
+   * Generate JVM options to be used to launch map tasks
+   *
+   * Uses mapreduce.admin.map.child.java.opts, mapreduce.map.java.opts and
+   * mapreduce.map.log.level from config to generate the opts.
+   *
+   * @param conf Configuration to be used to extract JVM opts specific info
+   * @return JAVA_OPTS string to be used in launching the JVM
+   */
   @SuppressWarnings("deprecation")
   public static String getMapJavaOpts(Configuration conf) {
     String adminOpts = conf.get(
@@ -285,6 +305,15 @@ public class MRHelpers {
         + getLog4jCmdLineProperties(conf, true);
   }
 
+  /**
+   * Generate JVM options to be used to launch reduce tasks
+   *
+   * Uses mapreduce.admin.reduce.child.java.opts, mapreduce.reduce.java.opts
+   * and mapreduce.reduce.log.level from config to generate the opts.
+   *
+   * @param conf Configuration to be used to extract JVM opts specific info
+   * @return JAVA_OPTS string to be used in launching the JVM
+   */
   @SuppressWarnings("deprecation")
   public static String getReduceJavaOpts(Configuration conf) {
     String adminOpts = conf.get(
@@ -419,4 +448,87 @@ public class MRHelpers {
     conf.readFields(dib);
     return conf;
   }
-}
\ No newline at end of file
+
+  /**
+   * Update provided localResources collection with the required local
+   * resources needed by MapReduce tasks with respect to Input splits.
+   *
+   * @param fs Filesystem instance to access status of splits related files
+   * @param inputSplitInfo Information on location of split files
+   * @param localResources LocalResources collection to be updated
+   * @throws IOException
+   */
+  public static void updateLocalResourcesForInputSplits(
+      FileSystem fs,
+      InputSplitInfo inputSplitInfo,
+      Map<String, LocalResource> localResources) throws IOException {
+    if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
+      throw new RuntimeException("LocalResources already contains a"
+          + " resource named " + JOB_SPLIT_RESOURCE_NAME);
+    }
+    if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
+      throw new RuntimeException("LocalResources already contains a"
+          + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
+    }
+
+    FileStatus splitFileStatus =
+        fs.getFileStatus(inputSplitInfo.getSplitsFile());
+    FileStatus metaInfoFileStatus =
+        fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
+    localResources.put(JOB_SPLIT_RESOURCE_NAME,
+        LocalResource.newInstance(
+            ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.APPLICATION,
+            splitFileStatus.getLen(), splitFileStatus.getModificationTime()));
+    localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
+        LocalResource.newInstance(
+            ConverterUtils.getYarnUrlFromPath(
+                inputSplitInfo.getSplitsMetaInfoFile()),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.APPLICATION,
+            metaInfoFileStatus.getLen(),
+            metaInfoFileStatus.getModificationTime()));
+  }
+
+  /**
+   * Extract the map task's container resource requirements from the
+   * provided configuration.
+   *
+   * Uses mapreduce.map.memory.mb and mapreduce.map.cpu.vcores from the
+   * provided configuration.
+   *
+   * @param conf Configuration with MR specific settings used to extract
+   * information from
+   *
+   * @return Resource object used to define requirements for containers
+   * running Map tasks
+   */
+  public static Resource getMapResource(Configuration conf) {
+    return Resource.newInstance(conf.getInt(
+        MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB),
+        conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+            MRJobConfig.DEFAULT_MAP_CPU_VCORES));
+  }
+
+  /**
+   * Extract the reduce task's container resource requirements from the
+   * provided configuration.
+   *
+   * Uses mapreduce.reduce.memory.mb and mapreduce.reduce.cpu.vcores from the
+   * provided configuration.
+   *
+   * @param conf Configuration with MR specific settings used to extract
+   * information from
+   *
+   * @return Resource object used to define requirements for containers
+   * running Reduce tasks
+   */
+  public static Resource getReduceResource(Configuration conf) {
+    return Resource.newInstance(conf.getInt(
+        MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
+        conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/57735b05/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
index b719c0a..2b34227 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
@@ -18,16 +18,17 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -39,6 +40,10 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.junit.Assert;
@@ -129,9 +134,11 @@ public class TestMRHelpers {
   public void testNewSplitsGen() throws Exception {
     InputSplitInfo info = generateNewSplits(newSplitsDir);
 
-    Assert.assertEquals(new Path(newSplitsDir, "job.split"),
+    Assert.assertEquals(new Path(newSplitsDir,
+        MRHelpers.JOB_SPLIT_RESOURCE_NAME),
         info.getSplitsFile());
-    Assert.assertEquals(new Path(newSplitsDir, "job.splitmetainfo"),
+    Assert.assertEquals(new Path(newSplitsDir,
+        MRHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME),
         info.getSplitsMetaInfoFile());
 
     RemoteIterator<LocatedFileStatus> files =
@@ -145,9 +152,9 @@ public class TestMRHelpers {
       LocatedFileStatus status = files.next();
       String fName = status.getPath().getName();
       totalFilesFound++;
-      if (fName.equals("job.split")) {
+      if (fName.equals(MRHelpers.JOB_SPLIT_RESOURCE_NAME)) {
         foundSplitsFile = true;
-      } else if (fName.equals("job.splitmetainfo")) {
+      } else if (fName.equals(MRHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
         foundMetaFile = true;
       } else {
         Assert.fail("Found invalid file in splits dir, filename=" + fName);
@@ -175,9 +182,11 @@ public class TestMRHelpers {
   @Test
   public void testOldSplitsGen() throws Exception {
     InputSplitInfo info = generateOldSplits(oldSplitsDir);
-    Assert.assertEquals(new Path(oldSplitsDir, "job.split"),
+    Assert.assertEquals(new Path(oldSplitsDir,
+        MRHelpers.JOB_SPLIT_RESOURCE_NAME),
         info.getSplitsFile());
-    Assert.assertEquals(new Path(oldSplitsDir, "job.splitmetainfo"),
+    Assert.assertEquals(new Path(oldSplitsDir,
+        MRHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME),
         info.getSplitsMetaInfoFile());
 
     RemoteIterator<LocatedFileStatus> files =
@@ -191,9 +200,9 @@ public class TestMRHelpers {
       LocatedFileStatus status = files.next();
       String fName = status.getPath().getName();
       totalFilesFound++;
-      if (fName.equals("job.split")) {
+      if (fName.equals(MRHelpers.JOB_SPLIT_RESOURCE_NAME)) {
         foundSplitsFile = true;
-      } else if (fName.equals("job.splitmetainfo")) {
+      } else if (fName.equals(MRHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
         foundMetaFile = true;
       } else {
         Assert.fail("Found invalid file in splits dir, filename=" + fName);
@@ -208,6 +217,32 @@ public class TestMRHelpers {
     verifyLocationHints(oldSplitsDir, info.getTaskLocationHints());
   }
 
+  @Test
+  public void testInputSplitLocalResourceCreation() throws Exception {
+    InputSplitInfo inputSplitInfo = generateOldSplits(oldSplitsDir);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("job.split", null);
+
+    try {
+      MRHelpers.updateLocalResourcesForInputSplits(remoteFs,
+          inputSplitInfo, localResources);
+      fail("Expected failure for job.split override in local resources map");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+
+    localResources.clear();
+    MRHelpers.updateLocalResourcesForInputSplits(remoteFs,
+        inputSplitInfo, localResources);
+
+    Assert.assertEquals(2, localResources.size());
+    Assert.assertTrue(localResources.containsKey(
+        MRHelpers.JOB_SPLIT_RESOURCE_NAME));
+    Assert.assertTrue(localResources.containsKey(
+        MRHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
+  }
+
   private Configuration createConfForJavaOptsTest() {
     Configuration conf = new Configuration(false);
     conf.set(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, "fooMapAdminOpts");
@@ -249,4 +284,32 @@ public class TestMRHelpers {
     Assert.assertTrue(opts.contains(" -Dhadoop.root.logger=TRACE"));
   }
 
+  @Test
+  public void testContainerResourceConstruction() {
+    Configuration conf = new Configuration();
+    Resource mapResource = MRHelpers.getMapResource(conf);
+    Resource reduceResource = MRHelpers.getReduceResource(conf);
+
+    Assert.assertEquals(MRJobConfig.DEFAULT_MAP_CPU_VCORES,
+        mapResource.getVirtualCores());
+    Assert.assertEquals(MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+        mapResource.getMemory());
+    Assert.assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES,
+        reduceResource.getVirtualCores());
+    Assert.assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+        reduceResource.getMemory());
+
+    conf.setInt(MRJobConfig.MAP_CPU_VCORES, 2);
+    conf.setInt(MRJobConfig.MAP_MEMORY_MB, 123);
+    conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 20);
+    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1234);
+
+    mapResource = MRHelpers.getMapResource(conf);
+    reduceResource = MRHelpers.getReduceResource(conf);
+
+    Assert.assertEquals(2, mapResource.getVirtualCores());
+    Assert.assertEquals(123, mapResource.getMemory());
+    Assert.assertEquals(20, reduceResource.getVirtualCores());
+    Assert.assertEquals(1234, reduceResource.getMemory());
+  }
 }


Mime
View raw message