hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1463203 [2/8] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org...
Date Mon, 01 Apr 2013 16:47:34 GMT
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Mon Apr  1 16:47:16 2013
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -64,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -148,8 +147,8 @@ public class ApplicationMaster {
   private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMClient resourceManager;
-
+  private AMRMClientAsync resourceManager;
+  
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
 
@@ -170,8 +169,6 @@ public class ApplicationMaster {
   // Priority of the request
   private int requestPriority;
 
-  // Simple flag to denote whether all works is done
-  private boolean appDone = false;
   // Counter for completed containers ( complete denotes successful or failed )
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
@@ -202,6 +199,9 @@ public class ApplicationMaster {
   // Hardcoded path to shell script in launch container's local env
   private final String ExecShellStringPath = "ExecShellScript.sh";
 
+  private volatile boolean done;
+  private volatile boolean success;
+  
   // Launch threads
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
@@ -319,10 +319,7 @@ public class ApplicationMaster {
 
     Map<String, String> envs = System.getenv();
 
-    if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
-      appAttemptID = ConverterUtils.toApplicationAttemptId(envs
-          .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
-    } else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+    if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
       if (cliParser.hasOption("app_attempt_id")) {
         String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
         appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@@ -336,6 +333,23 @@ public class ApplicationMaster {
       appAttemptID = containerId.getApplicationAttemptId();
     }
 
+    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
+      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) {
+      throw new RuntimeException(ApplicationConstants.NM_HOST_ENV
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) {
+      throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) {
+      throw new RuntimeException(ApplicationConstants.NM_PORT_ENV
+          + " not set in the environment");
+    }
+
     LOG.info("Application master for app" + ", appId="
         + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
         + appAttemptID.getApplicationId().getClusterTimestamp()
@@ -394,6 +408,10 @@ public class ApplicationMaster {
         "container_memory", "10"));
     numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
         "num_containers", "1"));
+    if (numTotalContainers == 0) {
+      throw new IllegalArgumentException(
+          "Cannot run distributed shell with no containers");
+    }
     requestPriority = Integer.parseInt(cliParser
         .getOptionValue("priority", "0"));
 
@@ -417,225 +435,202 @@ public class ApplicationMaster {
   public boolean run() throws YarnRemoteException {
     LOG.info("Starting ApplicationMaster");
 
-    // Connect to ResourceManager
-    resourceManager = new AMRMClientImpl(appAttemptID);
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    
+    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
-    try {
-      // Setup local RPC Server to accept status requests directly from clients
-      // TODO need to setup a protocol for client to be able to communicate to
-      // the RPC server
-      // TODO use the rpc port info to register with the RM for the client to
-      // send requests to this app master
-
-      // Register self with ResourceManager
-      RegisterApplicationMasterResponse response = resourceManager
-          .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
-              appMasterTrackingUrl);
-      // Dump out information about cluster capability as seen by the
-      // resource manager
-      int minMem = response.getMinimumResourceCapability().getMemory();
-      int maxMem = response.getMaximumResourceCapability().getMemory();
-      LOG.info("Min mem capabililty of resources in this cluster " + minMem);
-      LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
-      // A resource ask has to be atleast the minimum of the capability of the
-      // cluster, the value has to be a multiple of the min value and cannot
-      // exceed the max.
-      // If it is not an exact multiple of min, the RM will allocate to the
-      // nearest multiple of min
-      if (containerMemory < minMem) {
-        LOG.info("Container memory specified below min threshold of cluster."
-            + " Using min value." + ", specified=" + containerMemory + ", min="
-            + minMem);
-        containerMemory = minMem;
-      } else if (containerMemory > maxMem) {
-        LOG.info("Container memory specified above max threshold of cluster."
-            + " Using max value." + ", specified=" + containerMemory + ", max="
-            + maxMem);
-        containerMemory = maxMem;
-      }
-
-      // Setup heartbeat emitter
-      // TODO poll RM every now and then with an empty request to let RM know
-      // that we are alive
-      // The heartbeat interval after which an AM is timed out by the RM is
-      // defined by a config setting:
-      // RM_AM_EXPIRY_INTERVAL_MS with default defined by
-      // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
-      // The allocate calls to the RM count as heartbeats so, for now,
-      // this additional heartbeat emitter is not required.
-
-      // Setup ask for containers from RM
-      // Send request for containers to RM
-      // Until we get our fully allocated quota, we keep on polling RM for
-      // containers
-      // Keep looping until all the containers are launched and shell script
-      // executed on them ( regardless of success/failure).
-
-      int loopCounter = -1;
-
-      while (numCompletedContainers.get() < numTotalContainers && !appDone) {
-        loopCounter++;
-
-        // log current state
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // Sleep before each loop when asking RM for containers
-        // to avoid flooding RM with spurious requests when it
-        // need not have any available containers
-        // Sleeping for 1000 ms.
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.info("Sleep interrupted " + e.getMessage());
-        }
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    RegisterApplicationMasterResponse response = resourceManager
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int minMem = response.getMinimumResourceCapability().getMemory();
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask has to be atleast the minimum of the capability of the
+    // cluster, the value has to be a multiple of the min value and cannot
+    // exceed the max.
+    // If it is not an exact multiple of min, the RM will allocate to the
+    // nearest multiple of min
+    if (containerMemory < minMem) {
+      LOG.info("Container memory specified below min threshold of cluster."
+          + " Using min value." + ", specified=" + containerMemory + ", min="
+          + minMem);
+      containerMemory = minMem;
+    } else if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
 
-        // No. of containers to request
-        // For the first loop, askCount will be equal to total containers needed
-        // From that point on, askCount will always be 0 as current
-        // implementation does not change its ask on container failures.
-        int askCount = numTotalContainers - numRequestedContainers.get();
-        numRequestedContainers.addAndGet(askCount);
-
-        if (askCount > 0) {
-          ContainerRequest containerAsk = setupContainerAskForRM(askCount);
-          resourceManager.addContainerRequest(containerAsk);
-        }
 
-        // Send the request to RM
-        LOG.info("Asking RM for containers" + ", askCount=" + askCount);
-        AMResponse amResp = sendContainerAskToRM();
-
-        // Retrieve list of allocated containers from the response
-        List<Container> allocatedContainers = amResp.getAllocatedContainers();
-        LOG.info("Got response from RM for container ask, allocatedCnt="
-            + allocatedContainers.size());
-        numAllocatedContainers.addAndGet(allocatedContainers.size());
-        for (Container allocatedContainer : allocatedContainers) {
-          LOG.info("Launching shell command on a new container."
-              + ", containerId=" + allocatedContainer.getId()
-              + ", containerNode=" + allocatedContainer.getNodeId().getHost()
-              + ":" + allocatedContainer.getNodeId().getPort()
-              + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-              + ", containerState" + allocatedContainer.getState()
-              + ", containerResourceMemory"
-              + allocatedContainer.getResource().getMemory());
-          // + ", containerToken"
-          // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-          LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
-              allocatedContainer);
-          Thread launchThread = new Thread(runnableLaunchContainer);
-
-          // launch and start the container on a separate thread to keep
-          // the main thread unblocked
-          // as all containers may not be allocated at one go.
-          launchThreads.add(launchThread);
-          launchThread.start();
-        }
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+    resourceManager.addContainerRequest(containerAsk);
+    numRequestedContainers.set(numTotalContainers);
 
-        // Check what the current available resources in the cluster are
-        // TODO should we do anything if the available resources are not enough?
-        Resource availableResources = amResp.getAvailableResources();
-        LOG.info("Current available resources in the cluster "
-            + availableResources);
-
-        // Check the completed containers
-        List<ContainerStatus> completedContainers = amResp
-            .getCompletedContainersStatuses();
-        LOG.info("Got response from RM for container ask, completedCnt="
-            + completedContainers.size());
-        for (ContainerStatus containerStatus : completedContainers) {
-          LOG.info("Got container status for containerID="
-              + containerStatus.getContainerId() + ", state="
-              + containerStatus.getState() + ", exitStatus="
-              + containerStatus.getExitStatus() + ", diagnostics="
-              + containerStatus.getDiagnostics());
-
-          // non complete containers should not be here
-          assert (containerStatus.getState() == ContainerState.COMPLETE);
-
-          // increment counters for completed/failed containers
-          int exitStatus = containerStatus.getExitStatus();
-          if (0 != exitStatus) {
-            // container failed
-            if (-100 != exitStatus) {
-              // shell script failed
-              // counts as completed
-              numCompletedContainers.incrementAndGet();
-              numFailedContainers.incrementAndGet();
-            } else {
-              // something else bad happened
-              // app job did not complete for some reason
-              // we should re-try as the container was lost for some reason
-              numAllocatedContainers.decrementAndGet();
-              numRequestedContainers.decrementAndGet();
-              // we do not need to release the container as it would be done
-              // by the RM/CM.
-            }
-          } else {
-            // nothing to do
-            // container completed successfully
+    while (!done) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+    finish();
+    
+    return success;
+  }
+  
+  private void finish() {
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    success = true;
+    if (numFailedContainers.get() == 0) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      success = false;
+    }
+    try {
+      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnRemoteException ex) {
+      LOG.error("Failed to unregister application", ex);
+    }
+    
+    done = true;
+    resourceManager.stop();
+  }
+  
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+            // shell script failed
+            // counts as completed
             numCompletedContainers.incrementAndGet();
-            LOG.info("Container completed successfully." + ", containerId="
-                + containerStatus.getContainerId());
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
           }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
         }
-        if (numCompletedContainers.get() == numTotalContainers) {
-          appDone = true;
-        }
-
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // TODO
-        // Add a timeout handling layer
-        // for misbehaving shell commands
       }
-
-      // Join all launched threads
-      // needed for when we time out
-      // and we need to release containers
-      for (Thread launchThread : launchThreads) {
-        try {
-          launchThread.join(10000);
-        } catch (InterruptedException e) {
-          LOG.info("Exception thrown in thread join: " + e.getMessage());
-          e.printStackTrace();
-        }
+      
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+        resourceManager.addContainerRequest(containerAsk);
+      }
+      
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      resourceManager.setProgress(progress);
+      
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
       }
+    }
 
-      // When the application completes, it should send a finish application
-      // signal to the RM
-      LOG.info("Application completed. Signalling finish to RM");
-
-      FinalApplicationStatus appStatus;
-      String appMessage = null;
-      boolean isSuccess = true;
-      if (numFailedContainers.get() == 0) {
-        appStatus = FinalApplicationStatus.SUCCEEDED;
-      } else {
-        appStatus = FinalApplicationStatus.FAILED;
-        appMessage = "Diagnostics." + ", total=" + numTotalContainers
-            + ", completed=" + numCompletedContainers.get() + ", allocated="
-            + numAllocatedContainers.get() + ", failed="
-            + numFailedContainers.get();
-        isSuccess = false;
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerState" + allocatedContainer.getState()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+            allocatedContainer);
+        Thread launchThread = new Thread(runnableLaunchContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchThread.start();
       }
-      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
-      return isSuccess;
-    } finally {
-      resourceManager.stop();
     }
+
+    @Override
+    public void onRebootRequest() {}
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
   }
 
   /**
@@ -811,22 +806,4 @@ public class ApplicationMaster {
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }
-
-  /**
-   * Ask RM to allocate given no. of containers to this Application Master
-   *
-   * @param requestedContainers Containers to ask for from RM
-   * @return Response from RM to AM with allocated containers
-   * @throws YarnRemoteException
-   */
-  private AMResponse sendContainerAskToRM() throws YarnRemoteException {
-    float progressIndicator = (float) numCompletedContainers.get()
-        / numTotalContainers;
-
-    LOG.info("Sending request to RM for containers" + ", progress="
-        + progressIndicator);
-
-    AllocateResponse resp = resourceManager.allocate(progressIndicator);
-    return resp.getAMResponse();
-  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Mon Apr  1 16:47:16 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.applications.distributedshell;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -481,14 +483,15 @@ public class Client extends YarnClientIm
     // It should be provided out of the box. 
     // For now setting all required classpaths including
     // the classpath to "." for the application jar
-    StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
+      .append(File.pathSeparatorChar).append("./*");
     for (String c : conf.getStrings(
         YarnConfiguration.YARN_APPLICATION_CLASSPATH,
         YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-      classPathEnv.append(':');
+      classPathEnv.append(File.pathSeparatorChar);
       classPathEnv.append(c.trim());
     }
-    classPathEnv.append(":./log4j.properties");
+    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
 
     // add the runtime classpath needed for tests to work
     if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
@@ -505,7 +508,7 @@ public class Client extends YarnClientIm
 
     // Set java executable command 
     LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
     // Set Xmx based on am memory size
     vargs.add("-Xmx" + amMemory + "m");
     // Set class name 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Mon Apr  1 16:47:16 2013
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -54,8 +55,8 @@ public class TestDistributedShell {
     conf.setClass(YarnConfiguration.RM_SCHEDULER, 
         FifoScheduler.class, ResourceScheduler.class);
     if (yarnCluster == null) {
-      yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
-          1, 1, 1);
+      yarnCluster = new MiniYARNCluster(
+        TestDistributedShell.class.getSimpleName(), 1, 1, 1);
       yarnCluster.init(conf);
       yarnCluster.start();
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
@@ -82,7 +83,7 @@ public class TestDistributedShell {
     }
   }
 
-  @Test
+  @Test(timeout=30000)
   public void testDSShell() throws Exception {
 
     String[] args = {
@@ -91,7 +92,7 @@ public class TestDistributedShell {
         "--num_containers",
         "2",
         "--shell_command",
-        "ls",
+        Shell.WINDOWS ? "dir" : "ls",
         "--master_memory",
         "512",
         "--container_memory",
@@ -110,7 +111,7 @@ public class TestDistributedShell {
 
   }
 
-  @Test
+  @Test(timeout=30000)
   public void testDSShellWithNoArgs() throws Exception {
 
     String[] args = {};

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Mon Apr  1 16:47:16 2013
@@ -87,6 +87,16 @@
   <build>
     <plugins>
       <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+           <archive>
+             <manifest>
+               <mainClass>org.apache.hadoop.yarn.applications.unmanagedamlauncher.UnmanagedAMLauncher</mainClass>
+             </manifest>
+           </archive>
+        </configuration>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Mon Apr  1 16:47:16 2013
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -81,6 +83,8 @@ public class UnmanagedAMLauncher {
   // set the classpath explicitly
   private String classpath = null;
 
+  private volatile boolean amCompleted = false;
+
   /**
    * @param args
    *          Command line arguments
@@ -179,8 +183,18 @@ public class UnmanagedAMLauncher {
     if(!setClasspath && classpath!=null) {
       envAMList.add("CLASSPATH="+classpath);
     }
-        
-    envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
+
+    ContainerId containerId = Records.newRecord(ContainerId.class);
+    containerId.setApplicationAttemptId(attemptId);
+    containerId.setId(0);
+
+    String hostname = InetAddress.getLocalHost().getHostName();
+    envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId);
+    envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname);
+    envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0");
+    envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0");
+    envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
+        + System.currentTimeMillis());
 
     String[] envAM = new String[envAMList.size()];
     Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
@@ -233,8 +247,10 @@ public class UnmanagedAMLauncher {
       LOG.info("AM process exited with value: " + exitCode);
     } catch (InterruptedException e) {
       e.printStackTrace();
+    } finally {
+      amCompleted = true;
     }
-
+    
     try {
       // make sure that the error thread exits
       // on Windows these threads sometimes get stuck and hang the execution
@@ -306,6 +322,7 @@ public class UnmanagedAMLauncher {
       appReport = monitorApplication(appId, EnumSet.of(
           YarnApplicationState.KILLED, YarnApplicationState.FAILED,
           YarnApplicationState.FINISHED));
+
       YarnApplicationState appState = appReport.getYarnApplicationState();
       FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
   
@@ -341,6 +358,19 @@ public class UnmanagedAMLauncher {
   private ApplicationReport monitorApplication(ApplicationId appId,
       Set<YarnApplicationState> finalState) throws YarnRemoteException {
 
+    long foundAMCompletedTime = 0;
+    final int timeToWaitMS = 10000;
+    StringBuilder expectedFinalState = new StringBuilder();
+    boolean first = true;
+    for (YarnApplicationState state : finalState) {
+      if (first) {
+        first = false;
+        expectedFinalState.append(state.name());
+      } else {
+        expectedFinalState.append("," + state.name());
+      }
+    }
+
     while (true) {
 
       // Check app status every 1 second.
@@ -370,8 +400,24 @@ public class UnmanagedAMLauncher {
         return report;
       }
 
+      // wait for 10 seconds after process has completed for app report to
+      // come back
+      if (amCompleted) {
+        if (foundAMCompletedTime == 0) {
+          foundAMCompletedTime = System.currentTimeMillis();
+        } else if ((System.currentTimeMillis() - foundAMCompletedTime)
+            > timeToWaitMS) {
+          LOG.warn("Waited " + timeToWaitMS/1000
+              + " seconds after process completed for AppReport"
+              + " to reach desired final state. Not waiting anymore."
+              + "CurrentState = " + state
+              + ", ExpectedStates = " + expectedFinalState.toString());
+          throw new RuntimeException("Failed to receive final expected state"
+              + " in ApplicationReport"
+              + ", CurrentState=" + state
+              + ", ExpectedStates=" + expectedFinalState.toString());
+        }
+      }
     }
-
   }
-
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Mon Apr  1 16:47:16 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
 
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -91,7 +93,7 @@ public class TestUnmanagedAMLauncher {
     return envClassPath;
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testDSShell() throws Exception {
     String classpath = getTestRuntimeClasspath();
     String javaHome = System.getenv("JAVA_HOME");
@@ -99,7 +101,7 @@ public class TestUnmanagedAMLauncher {
       LOG.fatal("JAVA_HOME not defined. Test not running.");
       return;
     }
-    // start dist-shell with 0 containers because container launch will fail if 
+    // start dist-shell with 0 containers because container launch will fail if
     // there are no dist cache resources.
     String[] args = {
         "--classpath",
@@ -110,7 +112,7 @@ public class TestUnmanagedAMLauncher {
         javaHome
             + "/bin/java -Xmx512m "
             + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
-            + "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
+            + "--container_memory 128 --num_containers 1 --priority 0 --shell_command ls" };
 
     LOG.info("Initializing Launcher");
     UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
@@ -125,4 +127,40 @@ public class TestUnmanagedAMLauncher {
 
   }
 
+  @Test(timeout=30000)
+  public void testDSShellError() throws Exception {
+    String classpath = getTestRuntimeClasspath();
+    String javaHome = System.getenv("JAVA_HOME");
+    if (javaHome == null) {
+      LOG.fatal("JAVA_HOME not defined. Test not running.");
+      return;
+    }
+
+    // remove shell command to make dist-shell fail in initialization itself
+    String[] args = {
+        "--classpath",
+        classpath,
+        "--queue",
+        "default",
+        "--cmd",
+        javaHome
+            + "/bin/java -Xmx512m "
+            + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
+            + "--container_memory 128 --num_containers 1 --priority 0" };
+
+    LOG.info("Initializing Launcher");
+    UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
+        yarnCluster.getConfig()));
+    boolean initSuccess = launcher.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running Launcher");
+
+    try {
+      launcher.run();
+      fail("Expected an exception to occur as launch should have failed");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Mon Apr  1 16:47:16 2013
@@ -32,6 +32,28 @@
     <module>hadoop-yarn-applications-distributedshell</module>
     <module>hadoop-yarn-applications-unmanaged-am-launcher</module>
   </modules>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <!-- HADOOP_HOME required for tests on Windows to find winutils -->
+            <HADOOP_HOME>${basedir}/../../../../hadoop-common-project/hadoop-common/target</HADOOP_HOME>
+          </environmentVariables>
+          <properties>
+            <property>
+              <name>listener</name>
+              <value>org.apache.hadoop.test.TimedOutTestsListener</value>
+            </property>
+          </properties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
  <profiles>
   <profile>
     <id>clover</id>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Mon Apr  1 16:47:16 2013
@@ -35,11 +35,6 @@ import org.apache.hadoop.yarn.service.Se
 public interface AMRMClient extends Service {
 
   /**
-   * Value used to define no locality
-   */
-  static final String ANY = "*";
-
-  /**
    * Object to represent container request for resources.
    * Resources may be localized to nodes and racks.
    * Resources may be assigned priorities.

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Mon Apr  1 16:47:16 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -194,13 +193,12 @@ public class AMRMClientImpl extends Abst
       }
 
       allocateResponse = rmClient.allocate(allocateRequest);
-      AMResponse response = allocateResponse.getAMResponse();
 
       synchronized (this) {
         // update these on successful RPC
         clusterNodeCount = allocateResponse.getNumClusterNodes();
-        lastResponseId = response.getResponseId();
-        clusterAvailableResources = response.getAvailableResources();
+        lastResponseId = allocateResponse.getResponseId();
+        clusterAvailableResources = allocateResponse.getAvailableResources();
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -260,7 +258,8 @@ public class AMRMClientImpl extends Abst
     }
 
     // Off-switch
-    addResourceRequest(req.priority, ANY, req.capability, req.containerCount); 
+    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+        req.containerCount);
   }
 
   @Override
@@ -278,7 +277,8 @@ public class AMRMClientImpl extends Abst
       }
     }
    
-    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+    decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+        req.containerCount);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Mon Apr  1 16:47:16 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.yarn.client.cli;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
 
@@ -31,7 +33,9 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ApplicationCLI extends YarnCLI {
-  private static final String APPLICATIONS_PATTERN = "%30s\t%20s\t%10s\t%10s\t%18s\t%18s\t%35s\n";
+  private static final String APPLICATIONS_PATTERN =
+    "%30s\t%20s\t%10s\t%10s\t%18s\t%18s\t%35s" +
+    System.getProperty("line.separator");
 
   public static void main(String[] args) throws Exception {
     ApplicationCLI cli = new ApplicationCLI();
@@ -123,37 +127,44 @@ public class ApplicationCLI extends Yarn
    * @throws YarnRemoteException
    */
   private void printApplicationReport(String applicationId)
-      throws YarnRemoteException {
+      throws YarnRemoteException, IOException {
     ApplicationReport appReport = client.getApplicationReport(ConverterUtils
         .toApplicationId(applicationId));
-    StringBuffer appReportStr = new StringBuffer();
+    // Use PrintWriter.println, which uses correct platform line ending.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter appReportStr = new PrintWriter(baos);
     if (appReport != null) {
-      appReportStr.append("Application Report : ");
-      appReportStr.append("\n\tApplication-Id : ");
-      appReportStr.append(appReport.getApplicationId());
-      appReportStr.append("\n\tApplication-Name : ");
-      appReportStr.append(appReport.getName());
-      appReportStr.append("\n\tUser : ");
-      appReportStr.append(appReport.getUser());
-      appReportStr.append("\n\tQueue : ");
-      appReportStr.append(appReport.getQueue());
-      appReportStr.append("\n\tStart-Time : ");
-      appReportStr.append(appReport.getStartTime());
-      appReportStr.append("\n\tFinish-Time : ");
-      appReportStr.append(appReport.getFinishTime());
-      appReportStr.append("\n\tState : ");
-      appReportStr.append(appReport.getYarnApplicationState());
-      appReportStr.append("\n\tFinal-State : ");
-      appReportStr.append(appReport.getFinalApplicationStatus());
-      appReportStr.append("\n\tTracking-URL : ");
-      appReportStr.append(appReport.getOriginalTrackingUrl());
-      appReportStr.append("\n\tDiagnostics : ");
-      appReportStr.append(appReport.getDiagnostics());
+      appReportStr.println("Application Report : ");
+      appReportStr.print("\tApplication-Id : ");
+      appReportStr.println(appReport.getApplicationId());
+      appReportStr.print("\tApplication-Name : ");
+      appReportStr.println(appReport.getName());
+      appReportStr.print("\tUser : ");
+      appReportStr.println(appReport.getUser());
+      appReportStr.print("\tQueue : ");
+      appReportStr.println(appReport.getQueue());
+      appReportStr.print("\tStart-Time : ");
+      appReportStr.println(appReport.getStartTime());
+      appReportStr.print("\tFinish-Time : ");
+      appReportStr.println(appReport.getFinishTime());
+      appReportStr.print("\tState : ");
+      appReportStr.println(appReport.getYarnApplicationState());
+      appReportStr.print("\tFinal-State : ");
+      appReportStr.println(appReport.getFinalApplicationStatus());
+      appReportStr.print("\tTracking-URL : ");
+      appReportStr.println(appReport.getOriginalTrackingUrl());
+      appReportStr.print("\tRPC Port : ");
+      appReportStr.println(appReport.getRpcPort());
+      appReportStr.print("\tAM Host : ");
+      appReportStr.println(appReport.getHost());
+      appReportStr.print("\tDiagnostics : ");
+      appReportStr.print(appReport.getDiagnostics());
     } else {
-      appReportStr.append("Application with id '" + applicationId
+      appReportStr.print("Application with id '" + applicationId
           + "' doesn't exist in RM.");
     }
-    sysout.println(appReportStr.toString());
+    appReportStr.close();
+    sysout.println(baos.toString("UTF-8"));
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java Mon Apr  1 16:47:16 2013
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.yarn.client.cli;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.time.DateFormatUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -31,7 +35,9 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class NodeCLI extends YarnCLI {
-  private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%26s\t%18s\n";
+  private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%26s\t%18s" +
+    System.getProperty("line.separator");
+
   public static void main(String[] args) throws Exception {
     NodeCLI cli = new NodeCLI();
     cli.setSysOutPrintStream(System.out);
@@ -100,48 +106,52 @@ public class NodeCLI extends YarnCLI {
    * @param nodeIdStr
    * @throws YarnRemoteException
    */
-  private void printNodeStatus(String nodeIdStr) throws YarnRemoteException {
+  private void printNodeStatus(String nodeIdStr) throws YarnRemoteException,
+      IOException {
     NodeId nodeId = ConverterUtils.toNodeId(nodeIdStr);
     List<NodeReport> nodesReport = client.getNodeReports();
-    StringBuffer nodeReportStr = new StringBuffer();
+    // Use PrintWriter.println, which uses correct platform line ending.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter nodeReportStr = new PrintWriter(baos);
     NodeReport nodeReport = null;
     for (NodeReport report : nodesReport) {
       if (!report.getNodeId().equals(nodeId)) {
         continue;
       }
       nodeReport = report;
-      nodeReportStr.append("Node Report : ");
-      nodeReportStr.append("\n\tNode-Id : ");
-      nodeReportStr.append(nodeReport.getNodeId());
-      nodeReportStr.append("\n\tRack : ");
-      nodeReportStr.append(nodeReport.getRackName());
-      nodeReportStr.append("\n\tNode-State : ");
-      nodeReportStr.append(nodeReport.getNodeState());
-      nodeReportStr.append("\n\tNode-Http-Address : ");
-      nodeReportStr.append(nodeReport.getHttpAddress());
-      nodeReportStr.append("\n\tHealth-Status(isNodeHealthy) : ");
-      nodeReportStr.append(nodeReport.getNodeHealthStatus()
+      nodeReportStr.println("Node Report : ");
+      nodeReportStr.print("\tNode-Id : ");
+      nodeReportStr.println(nodeReport.getNodeId());
+      nodeReportStr.print("\tRack : ");
+      nodeReportStr.println(nodeReport.getRackName());
+      nodeReportStr.print("\tNode-State : ");
+      nodeReportStr.println(nodeReport.getNodeState());
+      nodeReportStr.print("\tNode-Http-Address : ");
+      nodeReportStr.println(nodeReport.getHttpAddress());
+      nodeReportStr.print("\tHealth-Status(isNodeHealthy) : ");
+      nodeReportStr.println(nodeReport.getNodeHealthStatus()
           .getIsNodeHealthy());
-      nodeReportStr.append("\n\tLast-Last-Health-Update : ");
-      nodeReportStr.append(nodeReport.getNodeHealthStatus()
-          .getLastHealthReportTime());
-      nodeReportStr.append("\n\tHealth-Report : ");
+      nodeReportStr.print("\tLast-Health-Update : ");
+      nodeReportStr.println(DateFormatUtils.format(
+          new Date(nodeReport.getNodeHealthStatus().
+            getLastHealthReportTime()),"E dd/MMM/yy hh:mm:ss:SSzz"));
+      nodeReportStr.print("\tHealth-Report : ");
       nodeReportStr
-          .append(nodeReport.getNodeHealthStatus().getHealthReport());
-      nodeReportStr.append("\n\tContainers : ");
-      nodeReportStr.append(nodeReport.getNumContainers());
-      nodeReportStr.append("\n\tMemory-Used : ");
-      nodeReportStr.append((nodeReport.getUsed() == null) ? "0M"
+          .println(nodeReport.getNodeHealthStatus().getHealthReport());
+      nodeReportStr.print("\tContainers : ");
+      nodeReportStr.println(nodeReport.getNumContainers());
+      nodeReportStr.print("\tMemory-Used : ");
+      nodeReportStr.println((nodeReport.getUsed() == null) ? "0M"
           : (nodeReport.getUsed().getMemory() + "M"));
-      nodeReportStr.append("\n\tMemory-Capacity : ");
-      nodeReportStr.append(nodeReport.getCapability().getMemory());
+      nodeReportStr.print("\tMemory-Capacity : ");
+      nodeReportStr.println(nodeReport.getCapability().getMemory());
     }
 
     if (nodeReport == null) {
-      nodeReportStr.append("Could not find the node report for node id : "
+      nodeReportStr.print("Could not find the node report for node id : "
           + nodeIdStr);
     }
-
-    sysout.println(nodeReportStr.toString());
+    nodeReportStr.close();
+    sysout.println(baos.toString("UTF-8"));
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Mon Apr  1 16:47:16 2013
@@ -18,25 +18,21 @@
 
 package org.apache.hadoop.yarn.client;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -58,6 +54,11 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.service.Service.STATE;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestAMRMClient {
   Configuration conf = null;
@@ -183,7 +184,7 @@ public class TestAMRMClient {
     int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
         .get(rack).get(capability).getNumContainers();
     int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-        .get(AMRMClient.ANY).get(capability).getNumContainers();
+        .get(ResourceRequest.ANY).get(capability).getNumContainers();
 
     assertTrue(containersRequestedNode == 2);
     assertTrue(containersRequestedRack == 2);
@@ -202,9 +203,8 @@ public class TestAMRMClient {
       assertTrue(amClient.release.size() == 0);
       
       assertTrue(nodeCount == amClient.getClusterNodeCount());
-      AMResponse amResponse = allocResponse.getAMResponse();
-      allocatedContainerCount += amResponse.getAllocatedContainers().size();
-      for(Container container : amResponse.getAllocatedContainers()) {
+      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+      for(Container container : allocResponse.getAllocatedContainers()) {
         ContainerId rejectContainerId = container.getId();
         releases.add(rejectContainerId);
         amClient.releaseAssignedContainer(rejectContainerId);
@@ -264,11 +264,11 @@ public class TestAMRMClient {
     while(!releases.isEmpty() || iterationsLeft-- > 0) {
       // inform RM of rejection
       AllocateResponse allocResponse = amClient.allocate(0.1f);
-      AMResponse amResponse = allocResponse.getAMResponse();
       // RM did not send new containers because AM does not need any
-      assertTrue(amResponse.getAllocatedContainers().size() == 0);
-      if(amResponse.getCompletedContainersStatuses().size() > 0) {
-        for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
+      assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+      if(allocResponse.getCompletedContainersStatuses().size() > 0) {
+        for(ContainerStatus cStatus :allocResponse
+            .getCompletedContainersStatuses()) {
           if(releases.contains(cStatus.getContainerId())) {
             assertTrue(cStatus.getState() == ContainerState.COMPLETE);
             assertTrue(cStatus.getExitStatus() == -100);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java Mon Apr  1 16:47:16 2013
@@ -29,11 +29,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.commons.lang.time.DateFormatUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -79,12 +82,23 @@ public class TestYarnCLI {
     int result = cli.run(new String[] { "-status", applicationId.toString() });
     assertEquals(0, result);
     verify(client).getApplicationReport(applicationId);
-    String appReportStr = "Application Report : \n\t"
-        + "Application-Id : application_1234_0005\n\t"
-        + "Application-Name : appname\n\tUser : user\n\t"
-        + "Queue : queue\n\tStart-Time : 0\n\tFinish-Time : 0\n\t"
-        + "State : FINISHED\n\tFinal-State : SUCCEEDED\n\t"
-        + "Tracking-URL : N/A\n\tDiagnostics : diagnostics\n";
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    pw.println("Application Report : ");
+    pw.println("\tApplication-Id : application_1234_0005");
+    pw.println("\tApplication-Name : appname");
+    pw.println("\tUser : user");
+    pw.println("\tQueue : queue");
+    pw.println("\tStart-Time : 0");
+    pw.println("\tFinish-Time : 0");
+    pw.println("\tState : FINISHED");
+    pw.println("\tFinal-State : SUCCEEDED");
+    pw.println("\tTracking-URL : N/A");
+    pw.println("\tRPC Port : 124");
+    pw.println("\tAM Host : host");
+    pw.println("\tDiagnostics : diagnostics");
+    pw.close();
+    String appReportStr = baos.toString("UTF-8");
     Assert.assertEquals(appReportStr, sysOutStream.toString());
     verify(sysOut, times(1)).println(isA(String.class));
   }
@@ -105,16 +119,18 @@ public class TestYarnCLI {
     assertEquals(0, result);
     verify(client).getApplicationList();
 
-    StringBuffer appsReportStrBuf = new StringBuffer();
-    appsReportStrBuf.append("Total Applications:1\n");
-    appsReportStrBuf
-        .append("                Application-Id\t    Application-Name"
-            + "\t      User\t     Queue\t             State\t       "
-            + "Final-State\t                       Tracking-URL\n");
-    appsReportStrBuf.append("         application_1234_0005\t             "
-        + "appname\t      user\t     queue\t          FINISHED\t         "
-        + "SUCCEEDED\t                                N/A\n");
-    Assert.assertEquals(appsReportStrBuf.toString(), sysOutStream.toString());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    pw.println("Total Applications:1");
+    pw.print("                Application-Id\t    Application-Name");
+    pw.print("\t      User\t     Queue\t             State\t       ");
+    pw.println("Final-State\t                       Tracking-URL");
+    pw.print("         application_1234_0005\t             ");
+    pw.print("appname\t      user\t     queue\t          FINISHED\t         ");
+    pw.println("SUCCEEDED\t                                N/A");
+    pw.close();
+    String appsReportStr = baos.toString("UTF-8");
+    Assert.assertEquals(appsReportStr, sysOutStream.toString());
     verify(sysOut, times(1)).write(any(byte[].class), anyInt(), anyInt());
   }
 
@@ -137,18 +153,20 @@ public class TestYarnCLI {
     int result = cli.run(new String[] { "-list" });
     assertEquals(0, result);
     verify(client).getNodeReports();
-    StringBuffer nodesReportStr = new StringBuffer();
-    nodesReportStr.append("Total Nodes:3");
-    nodesReportStr
-        .append("\n         Node-Id\tNode-State\tNode-Http-Address\t"
-            + "Health-Status(isNodeHealthy)\tRunning-Containers");
-    nodesReportStr.append("\n         host0:0\t   RUNNING\t       host1:8888"
-        + "\t                     false\t                 0");
-    nodesReportStr.append("\n         host1:0\t   RUNNING\t       host1:8888"
-        + "\t                     false\t                 0");
-    nodesReportStr.append("\n         host2:0\t   RUNNING\t       host1:8888"
-        + "\t                     false\t                 0\n");
-    Assert.assertEquals(nodesReportStr.toString(), sysOutStream.toString());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    pw.println("Total Nodes:3");
+    pw.print("         Node-Id\tNode-State\tNode-Http-Address\t");
+    pw.println("Health-Status(isNodeHealthy)\tRunning-Containers");
+    pw.print("         host0:0\t   RUNNING\t       host1:8888");
+    pw.println("\t                     false\t                 0");
+    pw.print("         host1:0\t   RUNNING\t       host1:8888");
+    pw.println("\t                     false\t                 0");
+    pw.print("         host2:0\t   RUNNING\t       host1:8888");
+    pw.println("\t                     false\t                 0");
+    pw.close();
+    String nodesReportStr = baos.toString("UTF-8");
+    Assert.assertEquals(nodesReportStr, sysOutStream.toString());
     verify(sysOut, times(1)).write(any(byte[].class), anyInt(), anyInt());
   }
 
@@ -163,11 +181,22 @@ public class TestYarnCLI {
     int result = cli.run(new String[] { "-status", nodeId.toString() });
     assertEquals(0, result);
     verify(client).getNodeReports();
-    String nodeStatusStr = "Node Report : \n\tNode-Id : host0:0\n\t"
-        + "Rack : rack1\n\tNode-State : RUNNING\n\t"
-        + "Node-Http-Address : host1:8888\n\tHealth-Status(isNodeHealthy) "
-        + ": false\n\tLast-Last-Health-Update : 0\n\tHealth-Report : null"
-        + "\n\tContainers : 0\n\tMemory-Used : 0M\n\tMemory-Capacity : 0";
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    pw.println("Node Report : ");
+    pw.println("\tNode-Id : host0:0");
+    pw.println("\tRack : rack1");
+    pw.println("\tNode-State : RUNNING");
+    pw.println("\tNode-Http-Address : host1:8888");
+    pw.println("\tHealth-Status(isNodeHealthy) : false");
+    pw.println("\tLast-Health-Update : "
+      + DateFormatUtils.format(new Date(0), "E dd/MMM/yy hh:mm:ss:SSzz"));
+    pw.println("\tHealth-Report : null");
+    pw.println("\tContainers : 0");
+    pw.println("\tMemory-Used : 0M");
+    pw.println("\tMemory-Capacity : 0");
+    pw.close();
+    String nodeStatusStr = baos.toString("UTF-8");
     verify(sysOut, times(1)).println(isA(String.class));
     verify(sysOut).println(nodeStatusStr);
   }
@@ -225,4 +254,4 @@ public class TestYarnCLI {
     return cli;
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Mon Apr  1 16:47:16 2013
@@ -180,10 +180,13 @@ public class YarnConfiguration extends C
     RM_PREFIX + "admin.client.thread-count";
   public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
   
-  /** The maximum number of application master retries.*/
-  public static final String RM_AM_MAX_RETRIES = 
-    RM_PREFIX + "am.max-retries";
-  public static final int DEFAULT_RM_AM_MAX_RETRIES = 1;
+  /**
+   * The maximum number of application attempts.
+   * It's a global setting for all application masters.
+   */
+  public static final String RM_AM_MAX_ATTEMPTS =
+    RM_PREFIX + "am.max-attempts";
+  public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 1;
   
   /** The keytab for the resource manager.*/
   public static final String RM_KEYTAB = 
@@ -304,6 +307,17 @@ public class YarnConfiguration extends C
   /** who will execute(launch) the containers.*/
   public static final String NM_CONTAINER_EXECUTOR = 
     NM_PREFIX + "container-executor.class";
+
+  /**  
+   * Adjustment to make to the container os scheduling priority.
+   * The valid values for this could vary depending on the platform.
+   * On Linux, higher values mean run the containers at a less 
+   * favorable priority than the NM. 
+   * The value specified is an int.
+   */
+  public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 
+    NM_PREFIX + "container-executor.os.sched.priority.adjustment";
+  public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0;
   
   /** Number of threads container manager uses.*/
   public static final String NM_CONTAINER_MGR_THREAD_COUNT =
@@ -426,6 +440,16 @@ public class YarnConfiguration extends C
   public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
   public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
 
+  /** Specifies whether physical memory check is enabled. */
+  public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX
+      + "pmem-check-enabled";
+  public static final boolean DEFAULT_NM_PMEM_CHECK_ENABLED = true;
+
+  /** Specifies whether physical memory check is enabled. */
+  public static final String NM_VMEM_CHECK_ENABLED = NM_PREFIX
+      + "vmem-check-enabled";
+  public static final boolean DEFAULT_NM_VMEM_CHECK_ENABLED = true;
+
   /** Conversion ratio for physical memory to virtual memory. */
   public static final String NM_VMEM_PMEM_RATIO =
     NM_PREFIX + "vmem-pmem-ratio";
@@ -610,6 +634,20 @@ public class YarnConfiguration extends C
   public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
       2000;
 
+  /** Max time to wait to establish a connection to RM when NM starts
+   */
+  public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
+      NM_PREFIX + "resourcemanager.connect.wait.secs";
+  public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
+      15*60;
+
+  /** Time interval between each NM attempt to connect to RM
+   */
+  public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
+      NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
+  public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+      = 30;
+
   /**
    * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
    * entries

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java Mon Apr  1 16:47:16 2013
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.io.Writer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -231,7 +232,6 @@ public class AggregatedLogFormat {
       out = this.writer.prepareAppendValue(-1);
       out.writeInt(VERSION);
       out.close();
-      this.fsDataOStream.hflush();
     }
 
     public void writeApplicationOwner(String user) throws IOException {
@@ -506,7 +506,7 @@ public class AggregatedLogFormat {
      * @throws IOException
      */
     public static void readAContainerLogsForALogType(
-        DataInputStream valueStream, DataOutputStream out)
+        DataInputStream valueStream, PrintStream out)
           throws IOException {
 
       byte[] buf = new byte[65535];
@@ -514,11 +514,11 @@ public class AggregatedLogFormat {
       String fileType = valueStream.readUTF();
       String fileLengthStr = valueStream.readUTF();
       long fileLength = Long.parseLong(fileLengthStr);
-      out.writeUTF("\nLogType:");
-      out.writeUTF(fileType);
-      out.writeUTF("\nLogLength:");
-      out.writeUTF(fileLengthStr);
-      out.writeUTF("\nLog Contents:\n");
+      out.print("LogType: ");
+      out.println(fileType);
+      out.print("LogLength: ");
+      out.println(fileLengthStr);
+      out.println("Log Contents:");
 
       int curRead = 0;
       long pendingRead = fileLength - curRead;
@@ -534,6 +534,7 @@ public class AggregatedLogFormat {
                   pendingRead > buf.length ? buf.length : (int) pendingRead;
         len = valueStream.read(buf, 0, toRead);
       }
+      out.println("");
     }
 
     public void close() throws IOException {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java Mon Apr  1 16:47:16 2013
@@ -19,10 +19,10 @@
 package org.apache.hadoop.yarn.logaggregation;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintStream;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -30,6 +30,7 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileContext;
@@ -57,10 +58,13 @@ public class LogDumper extends Configure
   public int run(String[] args) throws Exception {
 
     Options opts = new Options();
-    opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
-    opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
-    opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
-    opts.addOption(APP_OWNER_OPTION, true, "AppOwner");
+    opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId (required)");
+    opts.addOption(CONTAINER_ID_OPTION, true,
+      "ContainerId (must be specified if node address is specified)");
+    opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
+      + "nodename:port (must be specified if container id is specified)");
+    opts.addOption(APP_OWNER_OPTION, true,
+      "AppOwner (assumed to be current user if not specified)");
 
     if (args.length < 1) {
       HelpFormatter formatter = new HelpFormatter();
@@ -99,14 +103,12 @@ public class LogDumper extends Configure
     ApplicationId appId =
         ConverterUtils.toApplicationId(recordFactory, appIdStr);
 
-    DataOutputStream out = new DataOutputStream(System.out);
-
     if (appOwner == null || appOwner.isEmpty()) {
       appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
     }
     int resultCode = 0;
     if (containerIdStr == null && nodeAddress == null) {
-      resultCode = dumpAllContainersLogs(appId, appOwner, out);
+      resultCode = dumpAllContainersLogs(appId, appOwner, System.out);
     } else if ((containerIdStr == null && nodeAddress != null)
         || (containerIdStr != null && nodeAddress == null)) {
       System.out.println("ContainerId or NodeAddress cannot be null!");
@@ -125,7 +127,7 @@ public class LogDumper extends Configure
                   appOwner,
                   ConverterUtils.toNodeId(nodeAddress),
                   LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
-      resultCode = dumpAContainerLogs(containerIdStr, reader, out);
+      resultCode = dumpAContainerLogs(containerIdStr, reader, System.out);
     }
 
     return resultCode;
@@ -149,12 +151,11 @@ public class LogDumper extends Configure
           "Log aggregation has not completed or is not enabled.");
       return -1;
     }
-    DataOutputStream out = new DataOutputStream(System.out);
-    return dumpAContainerLogs(containerId, reader, out);
+    return dumpAContainerLogs(containerId, reader, System.out);
   }
 
   private int dumpAContainerLogs(String containerIdStr,
-      AggregatedLogFormat.LogReader reader, DataOutputStream out)
+      AggregatedLogFormat.LogReader reader, PrintStream out)
       throws IOException {
     DataInputStream valueStream;
     LogKey key = new LogKey();
@@ -183,7 +184,7 @@ public class LogDumper extends Configure
   }
 
   private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
-      DataOutputStream out) throws IOException {
+      PrintStream out) throws IOException {
     Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -216,6 +217,9 @@ public class LogDumper extends Configure
         valueStream = reader.next(key);
 
         while (valueStream != null) {
+          String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
+          out.println(containerString);
+          out.println(StringUtils.repeat("=", containerString.length()));
           while (true) {
             try {
               LogReader.readAContainerLogsForALogType(valueStream, out);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Mon Apr  1 16:47:16 2013
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
     allocateRequest.addAllReleases(containersToBeReleased);
     return allocateRequest;
   }
+  
+  public static AllocateResponse newAllocateResponse(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, boolean reboot, int numClusterNodes) {
+    AllocateResponse response = recordFactory
+        .newRecordInstance(AllocateResponse.class);
+    response.setNumClusterNodes(numClusterNodes);
+    response.setResponseId(responseId);
+    response.setCompletedContainersStatuses(completedContainers);
+    response.setAllocatedContainers(allocatedContainers);
+    response.setUpdatedNodes(updatedNodes);
+    response.setAvailableResources(availResources);
+    response.setReboot(reboot);
+
+    return response;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Mon Apr  1 16:47:16 2013
@@ -251,6 +251,12 @@ public class FSDownload implements Calla
       }
       break;
     }
+    if(localrsrc.isFile()){
+      try {
+        files.delete(new Path(localrsrc.toString()), false);
+      } catch (IOException ignore) {
+      }
+    }
     return 0;
     // TODO Should calculate here before returning
     //return FileUtil.getDU(destDir);
@@ -264,41 +270,41 @@ public class FSDownload implements Calla
     } catch (URISyntaxException e) {
       throw new IOException("Invalid resource", e);
     }
-
     Path tmp;
     do {
       tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
     } while (files.util().exists(tmp));
     destDirPath = tmp;
-
     createDir(destDirPath, cachePerms);
     final Path dst_work = new Path(destDirPath + "_tmp");
     createDir(dst_work, cachePerms);
-
     Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
     try {
-      Path dTmp = null == userUgi
-        ? files.makeQualified(copy(sCopy, dst_work))
-        : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
+      Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
+          : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
             public Path run() throws Exception {
               return files.makeQualified(copy(sCopy, dst_work));
             };
           });
       Pattern pattern = null;
       String p = resource.getPattern();
-      if(p != null) {
+      if (p != null) {
         pattern = Pattern.compile(p);
       }
       unpack(new File(dTmp.toUri()), new File(dFinal.toUri()), pattern);
       changePermissions(dFinal.getFileSystem(conf), dFinal);
       files.rename(dst_work, destDirPath, Rename.OVERWRITE);
     } catch (Exception e) {
-      try { files.delete(destDirPath, true); } catch (IOException ignore) { }
+      try {
+        files.delete(destDirPath, true);
+      } catch (IOException ignore) {
+      }
       throw e;
     } finally {
       try {
         files.delete(dst_work, true);
-      } catch (FileNotFoundException ignore) { }
+      } catch (FileNotFoundException ignore) {
+      }
       // clear ref to internal var
       rand = null;
       conf = null;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java Mon Apr  1 16:47:16 2013
@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
 
@@ -59,32 +60,30 @@ public class ProcfsBasedProcessTree exte
   public static final String PROCFS_STAT_FILE = "stat";
   public static final String PROCFS_CMDLINE_FILE = "cmdline";
   public static final long PAGE_SIZE;
-  static {
-    ShellCommandExecutor shellExecutor =
-            new ShellCommandExecutor(new String[]{"getconf",  "PAGESIZE"});
-    long pageSize = -1;
-    try {
-      shellExecutor.execute();
-      pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
-    } catch (IOException e) {
-      LOG.error(StringUtils.stringifyException(e));
-    } finally {
-      PAGE_SIZE = pageSize;
-    }
-  }
   public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+  
   static {
-    ShellCommandExecutor shellExecutor =
-            new ShellCommandExecutor(new String[]{"getconf",  "CLK_TCK"});
     long jiffiesPerSecond = -1;
+    long pageSize = -1;
     try {
-      shellExecutor.execute();
-      jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+      if(Shell.LINUX) {
+        ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor(
+            new String[] { "getconf", "CLK_TCK" });
+        shellExecutorClk.execute();
+        jiffiesPerSecond = Long.parseLong(shellExecutorClk.getOutput().replace("\n", ""));
+
+        ShellCommandExecutor shellExecutorPage = new ShellCommandExecutor(
+            new String[] { "getconf", "PAGESIZE" });
+        shellExecutorPage.execute();
+        pageSize = Long.parseLong(shellExecutorPage.getOutput().replace("\n", ""));
+
+      }
     } catch (IOException e) {
       LOG.error(StringUtils.stringifyException(e));
     } finally {
       JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
                      Math.round(1000D / jiffiesPerSecond) : -1;
+                     PAGE_SIZE = pageSize;
     }
   }
 
@@ -126,8 +125,7 @@ public class ProcfsBasedProcessTree exte
    */
   public static boolean isAvailable() {
     try {
-      String osName = System.getProperty("os.name");
-      if (!osName.startsWith("Linux")) {
+      if (!Shell.LINUX) {
         LOG.info("ProcfsBasedProcessTree currently is supported only on "
             + "Linux.");
         return false;



Mime
View raw message