hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1459557 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yar...
Date Thu, 21 Mar 2013 22:30:11 GMT
Author: bikas
Date: Thu Mar 21 22:30:10 2013
New Revision: 1459557

URL: http://svn.apache.org/r1459557
Log:
merge -c r1459555 from trunk to branch-2 for YARN-417. Create AMRMClient wrapper that provides
asynchronous callbacks. (Sandy Ryza via bikas)

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
      - copied unchanged from r1459555, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
      - copied unchanged from r1459555, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Mar 21 22:30:10 2013
@@ -33,6 +33,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
     hitesh)
 
+    YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
+    (Sandy Ryza via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
Thu Mar 21 22:30:10 2013
@@ -63,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -147,8 +147,8 @@ public class ApplicationMaster {
   private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMClient resourceManager;
-
+  private AMRMClientAsync resourceManager;
+  
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
 
@@ -169,8 +169,6 @@ public class ApplicationMaster {
   // Priority of the request
   private int requestPriority;
 
-  // Simple flag to denote whether all works is done
-  private boolean appDone = false;
   // Counter for completed containers ( complete denotes successful or failed )
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
@@ -201,6 +199,9 @@ public class ApplicationMaster {
   // Hardcoded path to shell script in launch container's local env
   private final String ExecShellStringPath = "ExecShellScript.sh";
 
+  private volatile boolean done;
+  private volatile boolean success;
+  
   // Launch threads
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
@@ -416,226 +417,202 @@ public class ApplicationMaster {
   public boolean run() throws YarnRemoteException {
     LOG.info("Starting ApplicationMaster");
 
-    // Connect to ResourceManager
-    resourceManager = new AMRMClientImpl(appAttemptID);
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    
+    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
-    try {
-      // Setup local RPC Server to accept status requests directly from clients
-      // TODO need to setup a protocol for client to be able to communicate to
-      // the RPC server
-      // TODO use the rpc port info to register with the RM for the client to
-      // send requests to this app master
-
-      // Register self with ResourceManager
-      RegisterApplicationMasterResponse response = resourceManager
-          .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
-              appMasterTrackingUrl);
-      // Dump out information about cluster capability as seen by the
-      // resource manager
-      int minMem = response.getMinimumResourceCapability().getMemory();
-      int maxMem = response.getMaximumResourceCapability().getMemory();
-      LOG.info("Min mem capabililty of resources in this cluster " + minMem);
-      LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
-      // A resource ask has to be atleast the minimum of the capability of the
-      // cluster, the value has to be a multiple of the min value and cannot
-      // exceed the max.
-      // If it is not an exact multiple of min, the RM will allocate to the
-      // nearest multiple of min
-      if (containerMemory < minMem) {
-        LOG.info("Container memory specified below min threshold of cluster."
-            + " Using min value." + ", specified=" + containerMemory + ", min="
-            + minMem);
-        containerMemory = minMem;
-      } else if (containerMemory > maxMem) {
-        LOG.info("Container memory specified above max threshold of cluster."
-            + " Using max value." + ", specified=" + containerMemory + ", max="
-            + maxMem);
-        containerMemory = maxMem;
-      }
-
-      // Setup heartbeat emitter
-      // TODO poll RM every now and then with an empty request to let RM know
-      // that we are alive
-      // The heartbeat interval after which an AM is timed out by the RM is
-      // defined by a config setting:
-      // RM_AM_EXPIRY_INTERVAL_MS with default defined by
-      // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
-      // The allocate calls to the RM count as heartbeats so, for now,
-      // this additional heartbeat emitter is not required.
-
-      // Setup ask for containers from RM
-      // Send request for containers to RM
-      // Until we get our fully allocated quota, we keep on polling RM for
-      // containers
-      // Keep looping until all the containers are launched and shell script
-      // executed on them ( regardless of success/failure).
-
-      int loopCounter = -1;
-
-      while (numCompletedContainers.get() < numTotalContainers && !appDone) {
-        loopCounter++;
-
-        // log current state
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // Sleep before each loop when asking RM for containers
-        // to avoid flooding RM with spurious requests when it
-        // need not have any available containers
-        // Sleeping for 1000 ms.
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.info("Sleep interrupted " + e.getMessage());
-        }
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    RegisterApplicationMasterResponse response = resourceManager
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int minMem = response.getMinimumResourceCapability().getMemory();
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask has to be atleast the minimum of the capability of the
+    // cluster, the value has to be a multiple of the min value and cannot
+    // exceed the max.
+    // If it is not an exact multiple of min, the RM will allocate to the
+    // nearest multiple of min
+    if (containerMemory < minMem) {
+      LOG.info("Container memory specified below min threshold of cluster."
+          + " Using min value." + ", specified=" + containerMemory + ", min="
+          + minMem);
+      containerMemory = minMem;
+    } else if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
 
-        // No. of containers to request
-        // For the first loop, askCount will be equal to total containers needed
-        // From that point on, askCount will always be 0 as current
-        // implementation does not change its ask on container failures.
-        int askCount = numTotalContainers - numRequestedContainers.get();
-        numRequestedContainers.addAndGet(askCount);
-
-        if (askCount > 0) {
-          ContainerRequest containerAsk = setupContainerAskForRM(askCount);
-          resourceManager.addContainerRequest(containerAsk);
-        }
 
-        // Send the request to RM
-        LOG.info("Asking RM for containers" + ", askCount=" + askCount);
-        AllocateResponse allocResp = sendContainerAskToRM();
-
-        // Retrieve list of allocated containers from the response
-        List<Container> allocatedContainers =
-            allocResp.getAllocatedContainers();
-        LOG.info("Got response from RM for container ask, allocatedCnt="
-            + allocatedContainers.size());
-        numAllocatedContainers.addAndGet(allocatedContainers.size());
-        for (Container allocatedContainer : allocatedContainers) {
-          LOG.info("Launching shell command on a new container."
-              + ", containerId=" + allocatedContainer.getId()
-              + ", containerNode=" + allocatedContainer.getNodeId().getHost()
-              + ":" + allocatedContainer.getNodeId().getPort()
-              + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-              + ", containerState" + allocatedContainer.getState()
-              + ", containerResourceMemory"
-              + allocatedContainer.getResource().getMemory());
-          // + ", containerToken"
-          // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-          LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
-              allocatedContainer);
-          Thread launchThread = new Thread(runnableLaunchContainer);
-
-          // launch and start the container on a separate thread to keep
-          // the main thread unblocked
-          // as all containers may not be allocated at one go.
-          launchThreads.add(launchThread);
-          launchThread.start();
-        }
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+    resourceManager.addContainerRequest(containerAsk);
+    numRequestedContainers.set(numTotalContainers);
 
-        // Check what the current available resources in the cluster are
-        // TODO should we do anything if the available resources are not enough?
-        Resource availableResources = allocResp.getAvailableResources();
-        LOG.info("Current available resources in the cluster "
-            + availableResources);
-
-        // Check the completed containers
-        List<ContainerStatus> completedContainers = allocResp
-            .getCompletedContainersStatuses();
-        LOG.info("Got response from RM for container ask, completedCnt="
-            + completedContainers.size());
-        for (ContainerStatus containerStatus : completedContainers) {
-          LOG.info("Got container status for containerID="
-              + containerStatus.getContainerId() + ", state="
-              + containerStatus.getState() + ", exitStatus="
-              + containerStatus.getExitStatus() + ", diagnostics="
-              + containerStatus.getDiagnostics());
-
-          // non complete containers should not be here
-          assert (containerStatus.getState() == ContainerState.COMPLETE);
-
-          // increment counters for completed/failed containers
-          int exitStatus = containerStatus.getExitStatus();
-          if (0 != exitStatus) {
-            // container failed
-            if (-100 != exitStatus) {
-              // shell script failed
-              // counts as completed
-              numCompletedContainers.incrementAndGet();
-              numFailedContainers.incrementAndGet();
-            } else {
-              // something else bad happened
-              // app job did not complete for some reason
-              // we should re-try as the container was lost for some reason
-              numAllocatedContainers.decrementAndGet();
-              numRequestedContainers.decrementAndGet();
-              // we do not need to release the container as it would be done
-              // by the RM/CM.
-            }
-          } else {
-            // nothing to do
-            // container completed successfully
+    while (!done) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+    finish();
+    
+    return success;
+  }
+  
+  private void finish() {
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    success = true;
+    if (numFailedContainers.get() == 0) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      success = false;
+    }
+    try {
+      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnRemoteException ex) {
+      LOG.error("Failed to unregister application", ex);
+    }
+    
+    done = true;
+    resourceManager.stop();
+  }
+  
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+            // shell script failed
+            // counts as completed
             numCompletedContainers.incrementAndGet();
-            LOG.info("Container completed successfully." + ", containerId="
-                + containerStatus.getContainerId());
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
           }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
         }
-        if (numCompletedContainers.get() == numTotalContainers) {
-          appDone = true;
-        }
-
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // TODO
-        // Add a timeout handling layer
-        // for misbehaving shell commands
       }
-
-      // Join all launched threads
-      // needed for when we time out
-      // and we need to release containers
-      for (Thread launchThread : launchThreads) {
-        try {
-          launchThread.join(10000);
-        } catch (InterruptedException e) {
-          LOG.info("Exception thrown in thread join: " + e.getMessage());
-          e.printStackTrace();
-        }
+      
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+        resourceManager.addContainerRequest(containerAsk);
+      }
+      
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      resourceManager.setProgress(progress);
+      
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
       }
+    }
 
-      // When the application completes, it should send a finish application
-      // signal to the RM
-      LOG.info("Application completed. Signalling finish to RM");
-
-      FinalApplicationStatus appStatus;
-      String appMessage = null;
-      boolean isSuccess = true;
-      if (numFailedContainers.get() == 0) {
-        appStatus = FinalApplicationStatus.SUCCEEDED;
-      } else {
-        appStatus = FinalApplicationStatus.FAILED;
-        appMessage = "Diagnostics." + ", total=" + numTotalContainers
-            + ", completed=" + numCompletedContainers.get() + ", allocated="
-            + numAllocatedContainers.get() + ", failed="
-            + numFailedContainers.get();
-        isSuccess = false;
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerState" + allocatedContainer.getState()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+            allocatedContainer);
+        Thread launchThread = new Thread(runnableLaunchContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchThread.start();
       }
-      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
-      return isSuccess;
-    } finally {
-      resourceManager.stop();
     }
+
+    @Override
+    public void onRebootRequest() {}
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
   }
 
   /**
@@ -811,21 +788,4 @@ public class ApplicationMaster {
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }
-
-  /**
-   * Ask RM to allocate given no. of containers to this Application Master
-   *
-   * @param requestedContainers Containers to ask for from RM
-   * @return Response from RM to AM with allocated containers
-   * @throws YarnRemoteException
-   */
-  private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
-    float progressIndicator = (float) numCompletedContainers.get()
-        / numTotalContainers;
-
-    LOG.info("Sending request to RM for containers" + ", progress="
-        + progressIndicator);
-
-    return resourceManager.allocate(progressIndicator);
-  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
Thu Mar 21 22:30:10 2013
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
     allocateRequest.addAllReleases(containersToBeReleased);
     return allocateRequest;
   }
+  
+  public static AllocateResponse newAllocateResponse(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, boolean reboot, int numClusterNodes) {
+    AllocateResponse response = recordFactory
+        .newRecordInstance(AllocateResponse.class);
+    response.setNumClusterNodes(numClusterNodes);
+    response.setResponseId(responseId);
+    response.setCompletedContainersStatuses(completedContainers);
+    response.setAllocatedContainers(allocatedContainers);
+    response.setUpdatedNodes(updatedNodes);
+    response.setAvailableResources(availResources);
+    response.setReboot(reboot);
+
+    return response;
+  }
 }



Mime
View raw message