hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1227422 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...
Date Thu, 05 Jan 2012 01:29:53 GMT
Author: acmurthy
Date: Thu Jan  5 01:29:52 2012
New Revision: 1227422

URL: http://svn.apache.org/viewvc?rev=1227422&view=rev
Log:
MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks. Contributed by Vinod
K V.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jan  5 01:29:52 2012
@@ -403,6 +403,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3595. Add missing TestCounters#testCounterValue test from branch
     1 to 0.23 (Tom White via sseth)
 
+    MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
+    (vinodkv via acmurthy) 
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
Thu Jan  5 01:29:52 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.util.Apps;
 
+@SuppressWarnings("deprecation")
 public class MapReduceChildJVM {
 
   private static String getTaskLogFile(LogName filter) {
@@ -46,7 +47,7 @@ public class MapReduceChildJVM {
           jobConf.get(JobConf.MAPRED_TASK_ENV));
     }
     return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
-        jobConf.get(jobConf.MAPRED_TASK_ENV));
+        jobConf.get(JobConf.MAPRED_TASK_ENV));
   }
 
   private static String getChildLogLevel(JobConf conf, boolean isMap) {
@@ -68,29 +69,9 @@ public class MapReduceChildJVM {
 
     JobConf conf = task.conf;
 
-    // Shell
-    environment.put(
-        Environment.SHELL.name(), 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
-            MRJobConfig.DEFAULT_SHELL)
-            );
-    
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    Apps.addToEnvironment(
-        environment, 
-        Environment.LD_LIBRARY_PATH.name(), 
-        Environment.PWD.$());
-
-    // Add the env variables passed by the user & admin
+    // Add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf, task.isMapTask());
     Apps.setEnvFromInputString(environment, mapredChildEnv);
-    Apps.setEnvFromInputString(
-        environment, 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
-            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
-        );
 
     // Set logging level in the environment.
     // This is so that, if the child forks another "bin/hadoop" (common in

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Thu Jan  5 01:29:52 2012
@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -154,6 +156,8 @@ public abstract class TaskAttemptImpl im
   private Token<JobTokenIdentifier> jobToken;
   private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
   private static String initialClasspath = null;
+  private static Object commonContainerSpecLock = new Object();
+  private static ContainerLaunchContext commonContainerSpec = null;
   private static final Object classpathLock = new Object();
   private long launchTime;
   private long finishTime;
@@ -497,29 +501,27 @@ public abstract class TaskAttemptImpl im
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.
-   * TODO: This should pave way for Builder pattern.
    */
-  private static LocalResource createLocalResource(FileSystem fc,
-      RecordFactory recordFactory, Path file, LocalResourceType type,
-      LocalResourceVisibility visibility) throws IOException {
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
     FileStatus fstat = fc.getFileStatus(file);
-    LocalResource resource =
-        recordFactory.newRecordInstance(LocalResource.class);
-    resource.setResource(ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
-        .getPath())));
-    resource.setType(type);
-    resource.setVisibility(visibility);
-    resource.setSize(fstat.getLen());
-    resource.setTimestamp(fstat.getModificationTime());
-    return resource;
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
   }
 
   /**
    * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: This should go away once we construct
-   * a parent CLC and use it for all the containers.
+   * getting the initial class-path. TODO: We already construct
+   * a parent CLC and use it for all the containers, so this should go away
+   * once the mr-generated-classpath stuff is gone.
    */
-  private String getInitialClasspath() throws IOException {
+  private static String getInitialClasspath() throws IOException {
     synchronized (classpathLock) {
       if (initialClasspathFlag.get()) {
         return initialClasspath;
@@ -534,11 +536,14 @@ public abstract class TaskAttemptImpl im
 
 
   /**
-   * Create the {@link ContainerLaunchContext} for this attempt.
+   * Create the common {@link ContainerLaunchContext} for all attempts.
    * @param applicationACLs 
    */
-  private ContainerLaunchContext createContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs) {
+  private static ContainerLaunchContext createCommonContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
 
     // Application resources
     Map<String, LocalResource> localResources = 
@@ -556,13 +561,13 @@ public abstract class TaskAttemptImpl im
       FileSystem remoteFS = FileSystem.get(conf);
 
       // //////////// Set up JobJar to be localized properly on the remote NM.
-      if (conf.get(MRJobConfig.JAR) != null) {
-        Path remoteJobJar = (new Path(remoteTask.getConf().get(
-              MRJobConfig.JAR))).makeQualified(remoteFS.getUri(), 
-                                               remoteFS.getWorkingDirectory());
+      String jobJar = conf.get(MRJobConfig.JAR);
+      if (jobJar != null) {
+        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
+            .getUri(), remoteFS.getWorkingDirectory());
         localResources.put(
             MRJobConfig.JOB_JAR,
-            createLocalResource(remoteFS, recordFactory, remoteJobJar,
+            createLocalResource(remoteFS, remoteJobJar,
                 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
         LOG.info("The job-jar file on the remote FS is "
             + remoteJobJar.toUri().toASCIIString());
@@ -584,7 +589,7 @@ public abstract class TaskAttemptImpl im
           new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
       localResources.put(
           MRJobConfig.JOB_CONF_FILE,
-          createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
+          createLocalResource(remoteFS, remoteJobConfPath,
               LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
       LOG.info("The job-conf file on the remote FS is "
           + remoteJobConfPath.toUri().toASCIIString());
@@ -630,19 +635,81 @@ public abstract class TaskAttemptImpl im
       throw new YarnException(e);
     }
 
-    // Setup environment
-    MapReduceChildJVM.setVMEnv(environment, remoteTask);
+    // Shell
+    environment.put(
+        Environment.SHELL.name(), 
+        conf.get(
+            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
+            MRJobConfig.DEFAULT_SHELL)
+            );
+
+    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+    Apps.addToEnvironment(
+        environment, 
+        Environment.LD_LIBRARY_PATH.name(), 
+        Environment.PWD.$());
+
+    // Add the env variables passed by the admin
+    Apps.setEnvFromInputString(
+        environment, 
+        conf.get(
+            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
+            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
+        );
+
+    // Construct the actual Container
+    // The null fields are per-container and will be constructed for each
+    // container separately.
+    ContainerLaunchContext container = BuilderUtils
+        .newContainerLaunchContext(null, conf
+            .get(MRJobConfig.USER_NAME), null, localResources,
+            environment, null, serviceData, tokens, applicationACLs);
+
+    return container;
+  }
+
+  static ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs,
+      ContainerId containerID, Configuration conf,
+      Token<JobTokenIdentifier> jobToken, Task remoteTask,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Resource assignedCapability, WrappedJvmID jvmID,
+      TaskAttemptListener taskAttemptListener,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+
+    synchronized (commonContainerSpecLock) {
+      if (commonContainerSpec == null) {
+        commonContainerSpec = createCommonContainerLaunchContext(
+            applicationACLs, conf, jobToken, oldJobId, fsTokens);
+      }
+    }
+
+    // Fill in the fields needed per-container that are missing in the common
+    // spec.
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerSpec.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+    MapReduceChildJVM.setVMEnv(myEnv, remoteTask);
 
     // Set up the launch command
     List<String> commands = MapReduceChildJVM.getVMCommand(
-        taskAttemptListener.getAddress(), remoteTask,
-        jvmID);
-    
+        taskAttemptListener.getAddress(), remoteTask, jvmID);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Entry<String, ByteBuffer> entry : commonContainerSpec
+                .getServiceData().entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
     // Construct the actual Container
-    ContainerLaunchContext container = BuilderUtils
-        .newContainerLaunchContext(containerID, conf
-            .get(MRJobConfig.USER_NAME), assignedCapability, localResources,
-            environment, commands, serviceData, tokens, applicationACLs);
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        containerID, commonContainerSpec.getUser(), assignedCapability,
+        commonContainerSpec.getLocalResources(), myEnv, commands,
+        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+        applicationACLs);
 
     return container;
   }
@@ -1022,7 +1089,7 @@ public abstract class TaskAttemptImpl im
 
   private static class ContainerAssignedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     @Override
     public void transition(final TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
@@ -1046,20 +1113,16 @@ public abstract class TaskAttemptImpl im
       
       //launch the container
       //create the container object to be launched for a given Task attempt
-      taskAttempt.eventHandler.handle(
-          new ContainerRemoteLaunchEvent(taskAttempt.attemptId, 
-              taskAttempt.containerID, 
-              taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
-        @Override
-        public ContainerLaunchContext getContainer() {
-          return taskAttempt.createContainerLaunchContext(cEvent
-              .getApplicationACLs());
-        }
-        @Override
-        public Task getRemoteTask() {  // classic mapred Task, not YARN version
-          return taskAttempt.remoteTask;
-        }
-      });
+      ContainerLaunchContext launchContext = createContainerLaunchContext(
+          cEvent.getApplicationACLs(), taskAttempt.containerID,
+          taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
+          taskAttempt.oldJobId, taskAttempt.assignedCapability,
+          taskAttempt.jvmID, taskAttempt.taskAttemptListener,
+          taskAttempt.fsTokens);
+      taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
+          taskAttempt.attemptId, taskAttempt.containerID,
+          taskAttempt.containerMgrAddress, taskAttempt.containerToken,
+          launchContext, taskAttempt.remoteTask));
 
       // send event to speculator that our container needs are satisfied
       taskAttempt.eventHandler.handle
@@ -1197,7 +1260,6 @@ public abstract class TaskAttemptImpl im
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
-      @SuppressWarnings("deprecation")
       TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(taskAttempt.conf,
             TypeConverter.fromYarn(taskAttempt.attemptId));

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
Thu Jan  5 01:29:52 2012
@@ -24,17 +24,31 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 
-public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+
+  private final ContainerLaunchContext container;
+  private final Task task;
 
   public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
       ContainerId containerID, String containerMgrAddress,
-      ContainerToken containerToken) {
-    super(taskAttemptID, containerID, containerMgrAddress,
-        containerToken,
+      ContainerToken containerToken,
+      ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+    super(taskAttemptID, containerID, containerMgrAddress, containerToken,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+    this.container = containerLaunchContext;
+    this.task = remoteTask;
   }
-  public abstract ContainerLaunchContext getContainer();
 
-  public abstract Task getRemoteTask();
+  public ContainerLaunchContext getContainer() {
+    return this.container;
+  }
 
+  public Task getRemoteTask() {
+    return this.task;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Thu Jan  5 01:29:52 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
@@ -65,7 +66,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
@@ -173,7 +176,8 @@ public class MRApp extends MRAppMaster {
   }
 
   public Job submit(Configuration conf) throws Exception {
-    String user = conf.get(MRJobConfig.USER_NAME, "mapred");
+    String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
+      .getCurrentUser().getShortUserName());
     conf.set(MRJobConfig.USER_NAME, user);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
     conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
@@ -187,6 +191,14 @@ public class MRApp extends MRAppMaster {
     start();
     DefaultMetricsSystem.shutdown();
     Job job = getContext().getAllJobs().values().iterator().next();
+
+    // Write job.xml
+    String jobFile = MRApps.getJobFile(conf, user,
+      TypeConverter.fromYarn(job.getID()));
+    LOG.info("Writing job conf to " + jobFile);
+    new File(jobFile).getParentFile().mkdirs();
+    conf.writeXml(new FileOutputStream(jobFile));
+
     return job;
   }
 
@@ -308,7 +320,7 @@ public class MRApp extends MRAppMaster {
     return new TaskAttemptListener(){
       @Override
       public InetSocketAddress getAddress() {
-        return null;
+        return NetUtils.createSocketAddr("localhost:54321");
       }
       @Override
       public void registerLaunchedTask(TaskAttemptId attemptID, 
@@ -337,11 +349,14 @@ public class MRApp extends MRAppMaster {
     return new MockContainerLauncher();
   }
 
-  class MockContainerLauncher implements ContainerLauncher {
+  protected class MockContainerLauncher implements ContainerLauncher {
 
     //We are running locally so set the shuffle port to -1 
     int shufflePort = -1;
 
+    public MockContainerLauncher() {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void handle(ContainerLauncherEvent event) {
@@ -474,6 +489,7 @@ public class MRApp extends MRAppMaster {
     }
     @Override
     protected void setup(JobImpl job) throws IOException {
+      super.setup(job);
       job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
       job.remoteJobConfFile = new Path("test");
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
Thu Jan  5 01:29:52 2012
@@ -40,6 +40,7 @@ import org.junit.Test;
 /**
  * Tests the state machine of MR App.
  */
+@SuppressWarnings("unchecked")
 public class TestMRApp {
 
   @Test

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1227422&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
Thu Jan  5 01:29:52 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.junit.Test;
+
+public class TestMapReduceChildJVM {
+
+  private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
+
+  @Test
+  public void testCommandLine() throws Exception {
+
+    MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    Assert.assertEquals(
+      "[exec $JAVA_HOME/bin/java" +
+      " -Djava.net.preferIPv4Stack=true" +
+      " -Dhadoop.metrics.log.level=WARN" +
+      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      " -Dlog4j.configuration=container-log4j.properties" +
+      " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
+      " -Dyarn.app.mapreduce.container.log.filesize=0" +
+      " -Dhadoop.root.logger=INFO,CLA" +
+      " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
+      " 54321" +
+      " attempt_0_0000_m_000000_0" +
+      " 0" +
+      " 1><LOG_DIR>/stdout" +
+      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+  }
+
+  private static final class MyMRApp extends MRApp {
+
+    private String myCommandLine;
+
+    public MyMRApp(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new MockContainerLauncher() {
+        @Override
+        public void handle(ContainerLauncherEvent event) {
+          if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+            ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
+            ContainerLaunchContext launchContext = launchEvent.getContainer();
+            String cmdString = launchContext.getCommands().toString();
+            LOG.info("launchContext " + cmdString);
+            myCommandLine = cmdString;
+          }
+          super.handle(event);
+        }
+      };
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1227422&r1=1227421&r2=1227422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
Thu Jan  5 01:29:52 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
+@SuppressWarnings("unchecked")
 public class TestTaskAttempt{
 
   @Test



Mime
View raw message