hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [1/2] hadoop git commit: YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge and Sunil G via wangda)
Date Fri, 05 Aug 2016 17:28:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk d9a354c2f -> e0d131f05


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index 4d5a7dc..fa13df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -43,17 +47,25 @@ public abstract class AbstractContainerAllocator {
   FiCaSchedulerApp application;
   final ResourceCalculator rc;
   final RMContext rmContext;
-  
+  ActivitiesManager activitiesManager;
+
   public AbstractContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext) {
+    this(application, rc, rmContext, null);
+  }
+
+  public AbstractContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext,
+      ActivitiesManager activitiesManager) {
     this.application = application;
     this.rc = rc;
     this.rmContext = rmContext;
+    this.activitiesManager = activitiesManager;
   }
 
   protected CSAssignment getCSAssignmentFromAllocateResult(
       Resource clusterResource, ContainerAllocation result,
-      RMContainer rmContainer) {
+      RMContainer rmContainer, FiCaSchedulerNode node) {
     // Handle skipped
     CSAssignment.SkippedType skipped =
         (result.getAllocationState() == AllocationState.APP_SKIPPED) ?
@@ -61,7 +73,7 @@ public abstract class AbstractContainerAllocator {
         CSAssignment.SkippedType.NONE;
     CSAssignment assignment = new CSAssignment(skipped);
     assignment.setApplication(application);
-    
+
     // Handle excess reservation
     assignment.setExcessReservation(result.getContainerToBeUnreserved());
 
@@ -85,6 +97,23 @@ public abstract class AbstractContainerAllocator {
         assignment.getAssignmentInformation().incrReservations();
         Resources.addTo(assignment.getAssignmentInformation().getReserved(),
             allocatedResource);
+
+        if (rmContainer != null) {
+          ActivitiesLogger.APP.recordAppActivityWithAllocation(
+              activitiesManager, node, application, updatedContainer,
+              ActivityState.RE_RESERVED);
+          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+        } else {
+          ActivitiesLogger.APP.recordAppActivityWithAllocation(
+              activitiesManager, node, application, updatedContainer,
+              ActivityState.RESERVED);
+          ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              updatedContainer.getId(), ActivityState.RESERVED,
+              ActivityDiagnosticConstant.EMPTY);
+        }
       } else if (result.getAllocationState() == AllocationState.ALLOCATED){
         // This is a new container
         // Inform the ordering policy
@@ -105,10 +134,18 @@ public abstract class AbstractContainerAllocator {
         assignment.getAssignmentInformation().incrAllocations();
         Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
             allocatedResource);
-        
+
         if (rmContainer != null) {
           assignment.setFulfilledReservation(true);
         }
+
+        ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
+            node, application, updatedContainer, ActivityState.ALLOCATED);
+        ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            updatedContainer.getId(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
       }
 
       assignment.setContainersToKill(result.getToKillContainers());
@@ -118,13 +155,13 @@ public abstract class AbstractContainerAllocator {
             CSAssignment.SkippedType.QUEUE_LIMIT);
       }
     }
-    
+
     return assignment;
   }
-  
+
   /**
    * allocate needs to handle following stuffs:
-   * 
+   *
    * <ul>
    * <li>Select request: Select a request to allocate. E.g. select a resource
    * request based on requirement/priority/locality.</li>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 3be8e0e..4eaa24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -36,12 +37,17 @@ public class ContainerAllocator extends AbstractContainerAllocator {
 
   public ContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext) {
+    this(application, rc, rmContext, null);
+  }
+
+  public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc,
+      RMContext rmContext, ActivitiesManager activitiesManager) {
     super(application, rc, rmContext);
 
     increaseContainerAllocator =
         new IncreaseContainerAllocator(application, rc, rmContext);
-    regularContainerAllocator =
-        new RegularContainerAllocator(application, rc, rmContext);
+    regularContainerAllocator = new RegularContainerAllocator(application, rc,
+        rmContext, activitiesManager);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 29b37d8..21114f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -24,11 +24,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@@ -37,6 +39,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
 
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -57,10 +65,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
   
   private ResourceRequest lastResourceRequest = null;
-  
+
   public RegularContainerAllocator(FiCaSchedulerApp application,
-      ResourceCalculator rc, RMContext rmContext) {
-    super(application, rc, rmContext);
+      ResourceCalculator rc, RMContext rmContext,
+      ActivitiesManager activitiesManager) {
+    super(application, rc, rmContext, activitiesManager);
   }
   
   private boolean checkHeadroom(Resource clusterResource,
@@ -85,15 +94,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
+    Priority priority = schedulerKey.getPriority();
+
     if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
       application.updateAppSkipNodeDiagnostics(
           CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
       return ContainerAllocation.APP_SKIPPED;
     }
 
     ResourceRequest anyRequest =
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (null == anyRequest) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -102,6 +119,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Do we need containers at this 'priority'?
     if (application.getTotalRequiredResources(schedulerKey) <= 0) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -116,6 +136,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         }
         application.updateAppSkipNodeDiagnostics(
             "Skipping assigning to Node in Ignore Exclusivity mode. ");
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
         return ContainerAllocation.APP_SKIPPED;
       }
     }
@@ -126,6 +149,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
         anyRequest.getNodeLabelExpression(), node.getPartition(),
         schedulingMode)) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.
+              PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -134,6 +161,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         if (LOG.isDebugEnabled()) {
           LOG.debug("doesn't need containers based on reservation algo!");
         }
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
     }
@@ -143,6 +173,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
       }
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
       return ContainerAllocation.QUEUE_SKIPPED;
     }
 
@@ -174,7 +207,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
               + missedNonPartitionedRequestSchedulingOpportunity + " required="
               + rmContext.getScheduler().getNumClusterNodes());
         }
-
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
         return ContainerAllocation.APP_SKIPPED;
       }
     }
@@ -301,6 +336,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Skip node-local request, go to rack-local request
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
     return ContainerAllocation.LOCALITY_SKIPPED;
   }
 
@@ -316,6 +354,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Skip rack-local request, go to off-switch request
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
     return ContainerAllocation.LOCALITY_SKIPPED;
   }
 
@@ -332,6 +373,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     application.updateAppSkipNodeDiagnostics(
         CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
     return ContainerAllocation.APP_SKIPPED;
   }
 
@@ -339,6 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer, SchedulingMode schedulingMode,
       ResourceLimits currentResoureLimits) {
+    Priority priority = schedulerKey.getPriority();
 
     ContainerAllocation allocation;
 
@@ -364,6 +409,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         application.getResourceRequest(schedulerKey, node.getRackName());
     if (rackLocalResourceRequest != null) {
       if (!rackLocalResourceRequest.getRelaxLocality()) {
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
 
@@ -387,6 +435,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (offSwitchResourceRequest != null) {
       if (!offSwitchResourceRequest.getRelaxLocality()) {
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
       if (requestType != NodeType.NODE_LOCAL
@@ -408,7 +459,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
       return allocation;
     }
-
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, priority,
+        ActivityDiagnosticConstant.PRIORITY_SKIPPED);
     return ContainerAllocation.PRIORITY_SKIPPED;
   }
 
@@ -416,6 +469,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       ResourceRequest request, NodeType type, RMContainer rmContainer,
       SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    Priority priority = schedulerKey.getPriority();
     lastResourceRequest = request;
     
     if (LOG.isDebugEnabled()) {
@@ -432,6 +486,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, priority,
+          ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL,
+          ActivityState.REJECTED);
       return new ContainerAllocation(rmContainer, null,
           AllocationState.LOCALITY_SKIPPED);
     }
@@ -446,6 +504,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
       // Skip this locality request
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
       return ContainerAllocation.LOCALITY_SKIPPED;
     }
 
@@ -524,6 +585,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           // continue.
           if (null == unreservedContainer) {
             // Skip the locality request
+            ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+                activitiesManager, node, application, priority,
+                ActivityDiagnosticConstant.LOCALITY_SKIPPED);
             return ContainerAllocation.LOCALITY_SKIPPED;
           }
         }
@@ -548,6 +612,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
               LOG.debug("we needed to unreserve to be able to allocate");
             }
             // Skip the locality request
+            ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+                activitiesManager, node, application, priority,
+                ActivityDiagnosticConstant.LOCALITY_SKIPPED);
             return ContainerAllocation.LOCALITY_SKIPPED;          
           }
         }
@@ -560,6 +627,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         return result;
       }
       // Skip the locality request
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.LOCALITY_SKIPPED);
       return ContainerAllocation.LOCALITY_SKIPPED;    
     }
   }
@@ -636,6 +706,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       ContainerAllocation ret =
           new ContainerAllocation(allocationResult.containerToBeUnreserved,
               null, AllocationState.APP_SKIPPED);
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, schedulerKey.getPriority(),
+          ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
       return ret;
     }
 
@@ -662,6 +735,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       application
           .updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
       LOG.warn("Couldn't get container for allocation!");
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, schedulerKey.getPriority(),
+          ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
+          ActivityState.REJECTED);
       return ContainerAllocation.APP_SKIPPED;
     }
 
@@ -741,6 +818,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
               + ", because it doesn't need more resource, schedulingMode="
               + schedulingMode.name() + " node-label=" + node.getPartition());
         }
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
         return CSAssignment.SKIP_ASSIGNMENT;
       }
       
@@ -755,18 +835,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           continue;
         }
         return getCSAssignmentFromAllocateResult(clusterResource, result,
-            null);
+            null, node);
       }
 
       // We will reach here if we skipped all priorities of the app, so we will
       // skip the app.
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, application.getPriority(),
+          ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
       return CSAssignment.SKIP_ASSIGNMENT;
     } else {
       ContainerAllocation result =
           allocate(clusterResource, node, schedulingMode, resourceLimits,
               reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
-          reservedContainer);
+          reservedContainer, node);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 67d93a4..33dee80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@@ -108,8 +109,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
+    this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+        appPriority, isAttemptRecovering, null);
+  }
+
+  public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
+      ActivitiesManager activitiesManager) {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
-    
+
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
 
     Resource amResource;
@@ -139,8 +148,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (scheduler.getResourceCalculator() != null) {
       rc = scheduler.getResourceCalculator();
     }
-    
-    containerAllocator = new ContainerAllocator(this, rc, rmContext);
+
+    containerAllocator = new ContainerAllocator(this, rc, rmContext,
+        activitiesManager);
 
     if (scheduler instanceof CapacityScheduler) {
       capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
@@ -189,7 +199,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       return null;
     }
     
-    // Required sanity check - AM can call 'allocate' to update resource 
+    // Required sanity check - AM can call 'allocate' to update resource
     // request without locking the scheduler, hence we need to check
     if (getTotalRequiredResources(schedulerKey) <= 0) {
       return null;
@@ -493,7 +503,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public LeafQueue getCSLeafQueue() {
     return (LeafQueue)queue;
   }
-  
+
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode, RMContainer reservedContainer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 75bffc7..4305fd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -130,9 +130,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
@@ -176,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
@@ -577,6 +582,124 @@ public class RMWebServices extends WebServices {
   }
 
   @GET
+  @Path("/scheduler/activities")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
+      @QueryParam("nodeId") String nodeId) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+    if (scheduler instanceof AbstractYarnScheduler) {
+      String errMessage = "";
+
+      AbstractYarnScheduler abstractYarnScheduler =
+          (AbstractYarnScheduler) scheduler;
+
+      ActivitiesManager activitiesManager =
+          abstractYarnScheduler.getActivitiesManager();
+      if (null == activitiesManager) {
+        errMessage = "Not Capacity Scheduler";
+        return new ActivitiesInfo(errMessage, nodeId);
+      }
+
+      List<FiCaSchedulerNode> nodeList =
+          abstractYarnScheduler.getNodeTracker().getAllNodes();
+
+      boolean illegalInput = false;
+
+      if (nodeList.size() == 0) {
+        illegalInput = true;
+        errMessage = "No node manager running in the cluster";
+      } else {
+        if (nodeId != null) {
+          String hostName = nodeId;
+          String portName = "";
+          if (nodeId.contains(":")) {
+            int index = nodeId.indexOf(":");
+            hostName = nodeId.substring(0, index);
+            portName = nodeId.substring(index + 1);
+          }
+
+          boolean correctNodeId = false;
+          for (FiCaSchedulerNode node : nodeList) {
+            if ((portName.equals("") && node.getRMNode().getHostName().equals(
+                hostName)) || (!portName.equals("") && node.getRMNode()
+                .getHostName().equals(hostName) && String.valueOf(
+                node.getRMNode().getCommandPort()).equals(portName))) {
+              correctNodeId = true;
+              nodeId = node.getNodeID().toString();
+              break;
+            }
+          }
+          if (!correctNodeId) {
+            illegalInput = true;
+            errMessage = "Cannot find node manager with given node id";
+          }
+        }
+      }
+
+      if (!illegalInput) {
+        activitiesManager.recordNextNodeUpdateActivities(nodeId);
+        return activitiesManager.getActivitiesInfo(nodeId);
+      }
+
+      // Return a activities info with error message
+      return new ActivitiesInfo(errMessage, nodeId);
+    }
+
+    return null;
+  }
+
+  @GET
+  @Path("/scheduler/app-activities")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
+      @QueryParam("appId") String appId, @QueryParam("maxTime") String time) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+    if (scheduler instanceof AbstractYarnScheduler) {
+      AbstractYarnScheduler abstractYarnScheduler =
+          (AbstractYarnScheduler) scheduler;
+
+      ActivitiesManager activitiesManager =
+          abstractYarnScheduler.getActivitiesManager();
+      if (null == activitiesManager) {
+        String errMessage = "Not Capacity Scheduler";
+        return new AppActivitiesInfo(errMessage, appId);
+      }
+
+      if(appId == null) {
+        String errMessage = "Must provide an application Id";
+        return new AppActivitiesInfo(errMessage, null);
+      }
+
+      double maxTime = 3.0;
+
+      if (time != null) {
+        if (time.contains(".")) {
+          maxTime = Double.parseDouble(time);
+        } else {
+          maxTime = Double.parseDouble(time + ".0");
+        }
+      }
+
+      ApplicationId applicationId;
+      try {
+        applicationId = ApplicationId.fromString(appId);
+        activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
+        AppActivitiesInfo appActivitiesInfo =
+            activitiesManager.getAppActivitiesInfo(applicationId);
+
+        return appActivitiesInfo;
+      } catch (Exception e) {
+        String errMessage = "Cannot find application with given appId";
+        return new AppActivitiesInfo(errMessage, appId);
+      }
+
+    }
+    return null;
+  }
+
+  @GET
   @Path("/appstatistics")
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public ApplicationStatisticsInfo getAppStatistics(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
new file mode 100644
index 0000000..0de340a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+
+/*
+ * DAO object to display node allocation activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivitiesInfo {
+  protected String nodeId;
+  protected String timeStamp;
+  protected String diagnostic = null;
+  protected List<NodeAllocationInfo> allocations;
+
+  private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class);
+
+  public ActivitiesInfo() {
+  }
+
+  public ActivitiesInfo(String errorMessage, String nodeId) {
+    this.diagnostic = errorMessage;
+    this.nodeId = nodeId;
+  }
+
+  public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) {
+    this.nodeId = nodeId;
+    this.allocations = new ArrayList<>();
+
+    if (nodeAllocations == null) {
+      diagnostic = (nodeId != null ?
+          "waiting for display" :
+          "waiting for next allocation");
+    } else {
+      if (nodeAllocations.size() == 0) {
+        diagnostic = "do not have available resources";
+      } else {
+        this.nodeId = nodeAllocations.get(0).getNodeId();
+
+        Date date = new Date();
+        date.setTime(nodeAllocations.get(0).getTimeStamp());
+        this.timeStamp = date.toString();
+
+        for (int i = 0; i < nodeAllocations.size(); i++) {
+          NodeAllocation nodeAllocation = nodeAllocations.get(i);
+          NodeAllocationInfo allocationInfo = new NodeAllocationInfo(
+              nodeAllocation);
+          this.allocations.add(allocationInfo);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
new file mode 100644
index 0000000..9553a720
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * DAO object to display node information in allocation tree.
+ * It corresponds to "ActivityNode" class.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivityNodeInfo {
+  protected String name;  // The name for activity node
+  protected String appPriority;
+  protected String requestPriority;
+  protected String allocationState;
+  protected String diagnostic;
+
+  protected List<ActivityNodeInfo> children;
+
+  ActivityNodeInfo() {
+  }
+
+  ActivityNodeInfo(ActivityNode node) {
+    this.name = node.getName();
+    getPriority(node);
+    this.allocationState = node.getState().name();
+    this.diagnostic = node.getDiagnostic();
+    this.children = new ArrayList<>();
+
+    for (ActivityNode child : node.getChildren()) {
+      ActivityNodeInfo containerInfo = new ActivityNodeInfo(child);
+      this.children.add(containerInfo);
+    }
+  }
+
+  private void getPriority(ActivityNode node) {
+    if (node.getType()) {
+      this.appPriority = node.getAppPriority();
+    } else {
+      this.requestPriority = node.getRequestPriority();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
new file mode 100644
index 0000000..38c45a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppActivitiesInfo {
+  protected String applicationId;
+  protected String diagnostic;
+  protected String timeStamp;
+  protected List<AppAllocationInfo> allocations;
+
+  private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class);
+
+  public AppActivitiesInfo() {
+  }
+
+  public AppActivitiesInfo(String errorMessage, String applicationId) {
+    this.diagnostic = errorMessage;
+    this.applicationId = applicationId;
+
+    Date date = new Date();
+    date.setTime(SystemClock.getInstance().getTime());
+    this.timeStamp = date.toString();
+  }
+
+  public AppActivitiesInfo(List<AppAllocation> appAllocations,
+      ApplicationId applicationId) {
+    this.applicationId = applicationId.toString();
+    this.allocations = new ArrayList<>();
+
+    if (appAllocations == null) {
+      diagnostic = "waiting for display";
+
+      Date date = new Date();
+      date.setTime(SystemClock.getInstance().getTime());
+      this.timeStamp = date.toString();
+    } else {
+      for (int i = appAllocations.size() - 1; i > -1; i--) {
+        AppAllocation appAllocation = appAllocations.get(i);
+        AppAllocationInfo appAllocationInfo = new AppAllocationInfo(
+            appAllocation);
+        this.allocations.add(appAllocationInfo);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
new file mode 100644
index 0000000..21d3788
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application allocation detailed information.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppAllocationInfo {
+  protected String nodeId;
+  protected String queueName;
+  protected String appPriority;
+  protected String allocatedContainerId;
+  protected String allocationState;
+  protected String diagnostic;
+  protected String timeStamp;
+  protected List<ActivityNodeInfo> allocationAttempt;
+
+  private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class);
+
+  AppAllocationInfo() {
+  }
+
+  AppAllocationInfo(AppAllocation allocation) {
+    this.allocationAttempt = new ArrayList<>();
+
+    this.nodeId = allocation.getNodeId();
+    this.queueName = allocation.getQueueName();
+    this.appPriority = allocation.getPriority();
+    this.allocatedContainerId = allocation.getContainerId();
+    this.allocationState = allocation.getAppState().name();
+    this.diagnostic = allocation.getDiagnostic();
+
+    Date date = new Date();
+    date.setTime(allocation.getTime());
+    this.timeStamp = date.toString();
+
+    for (ActivityNode attempt : allocation.getAllocationAttempts()) {
+      ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt);
+      this.allocationAttempt.add(containerInfo);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
new file mode 100644
index 0000000..1350a76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/*
+ * DAO object to display each node allocation in node heartbeat.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class NodeAllocationInfo {
+  protected String allocatedContainerId;
+  protected String finalAllocationState;
+  protected ActivityNodeInfo root = null;
+
+  private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class);
+
+  NodeAllocationInfo() {
+  }
+
+  NodeAllocationInfo(NodeAllocation allocation) {
+    this.allocatedContainerId = allocation.getContainerId();
+    this.finalAllocationState = allocation.getFinalAllocationState().name();
+
+    root = new ActivityNodeInfo(allocation.getRoot());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index 649d719..bbdfdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -62,9 +62,9 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
 
 public class TestRMWebServicesCapacitySched extends JerseyTestBase {
 
-  private static MockRM rm;
-  private static CapacitySchedulerConfiguration csConf;
-  private static YarnConfiguration conf;
+  protected static MockRM rm;
+  protected static CapacitySchedulerConfiguration csConf;
+  protected static YarnConfiguration conf;
 
   private class QueueInfo {
     float capacity;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
new file mode 100644
index 0000000..d7b0581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -0,0 +1,777 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRMWebServicesSchedulerActivities
+    extends TestRMWebServicesCapacitySched {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestRMWebServicesSchedulerActivities.class);
+
+  @Test
+  public void testAssignMultipleContainersPerNodeHeartbeat()
+      throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1:1234");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 11);
+
+      JSONArray allocations = json.getJSONArray("allocations");
+      for (int i = 0; i < allocations.length(); i++) {
+        if (i != allocations.length() - 1) {
+          verifyStateOfAllocations(allocations.getJSONObject(i),
+              "finalAllocationState", "ALLOCATED");
+          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
+        } else {
+          verifyStateOfAllocations(allocations.getJSONObject(i),
+              "finalAllocationState", "SKIPPED");
+          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b");
+        }
+      }
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAssignWithoutAvailableResource() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testNoNM() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    try {
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1:1234");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testWrongNodeId() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.0");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testReserveNewContainer() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+
+    nm1.registerNode();
+    nm2.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+              10)), null);
+
+      // Reserve new container
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1");
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED");
+
+      // Do a node heartbeat again without releasing container from app2
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+      allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED");
+
+      // Finish application 2
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      ContainerId containerId = ContainerId.newContainerId(
+          am2.getApplicationAttemptId(), 1);
+      cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+              .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+          RMContainerEventType.FINISHED);
+
+      // Do a node heartbeat again
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+      allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState",
+          "ALLOCATED_FROM_RESERVED");
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testActivityJSON() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState",
+          "ALLOCATED");
+
+      verifyNumberOfNodes(allocations, 6);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1");
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  private void verifyNumberOfNodes(JSONObject allocation, int realValue)
+      throws Exception {
+    if (allocation.isNull("root")) {
+      assertEquals("State of allocation is wrong", 0, realValue);
+    } else {
+      assertEquals("State of allocation is wrong",
+          1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
+    }
+  }
+
+  private int getNumberOfNodes(JSONObject allocation) throws Exception {
+    if (!allocation.isNull("children")) {
+      Object object = allocation.get("children");
+      if (object.getClass() == JSONObject.class) {
+        return 1 + getNumberOfNodes((JSONObject) object);
+      } else {
+        int count = 0;
+        for (int i = 0; i < ((JSONArray) object).length(); i++) {
+          count += (1 + getNumberOfNodes(
+              ((JSONArray) object).getJSONObject(i)));
+        }
+        return count;
+      }
+    } else {
+      return 0;
+    }
+  }
+
+  private void verifyStateOfAllocations(JSONObject allocation,
+      String nameToCheck, String realState) throws Exception {
+    assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
+        realState);
+  }
+
+  private void verifyNumberOfAllocations(JSONObject json, int realValue)
+      throws Exception {
+    if (json.isNull("allocations")) {
+      assertEquals("Number of allocations is wrong", 0, realValue);
+    } else {
+      Object object = json.get("allocations");
+      if (object.getClass() == JSONObject.class) {
+        assertEquals("Number of allocations is wrong", 1, realValue);
+      } else if (object.getClass() == JSONArray.class) {
+        assertEquals("Number of allocations is wrong",
+            ((JSONArray) object).length(), realValue);
+      }
+    }
+  }
+
+  private void verifyQueueOrder(JSONObject json, String realOrder)
+      throws Exception {
+    String order = "";
+    if (!json.isNull("root")) {
+      JSONObject root = json.getJSONObject("root");
+      order = root.getString("name") + "-" + getQueueOrder(root);
+    }
+    assertEquals("Order of queue is wrong",
+        order.substring(0, order.length() - 1), realOrder);
+  }
+
+  private String getQueueOrder(JSONObject node) throws Exception {
+    if (!node.isNull("children")) {
+      Object children = node.get("children");
+      if (children.getClass() == JSONObject.class) {
+        if (!((JSONObject) children).isNull("appPriority")) {
+          return "";
+        }
+        return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
+            (JSONObject) children);
+      } else if (children.getClass() == JSONArray.class) {
+        String order = "";
+        for (int i = 0; i < ((JSONArray) children).length(); i++) {
+          JSONObject child = (JSONObject) ((JSONArray) children).get(i);
+          if (!child.isNull("appPriority")) {
+            return "";
+          }
+          order += (child.getString("name") + "-" + getQueueOrder(child));
+        }
+        return order;
+      }
+    }
+    return "";
+  }
+
+  private void verifyNumberOfAllocationAttempts(JSONObject allocation,
+      int realValue) throws Exception {
+    if (allocation.isNull("allocationAttempt")) {
+      assertEquals("Number of allocation attempts is wrong", 0, realValue);
+    } else {
+      Object object = allocation.get("allocationAttempt");
+      if (object.getClass() == JSONObject.class) {
+        assertEquals("Number of allocations attempts is wrong", 1, realValue);
+      } else if (object.getClass() == JSONArray.class) {
+        assertEquals("Number of allocations attempts is wrong",
+            ((JSONArray) object).length(), realValue);
+      }
+    }
+  }
+
+  @Test
+  public void testAppActivityJSON() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
+
+      verifyNumberOfAllocationAttempts(allocations, 1);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppAssignMultipleContainersPerNodeHeartbeat()
+      throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 10);
+
+      JSONArray allocations = json.getJSONArray("allocations");
+      for (int i = 0; i < allocations.length(); i++) {
+        verifyStateOfAllocations(allocations.getJSONObject(i),
+            "allocationState", "ACCEPTED");
+      }
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppAssignWithoutAvailableResource() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppNoNM() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppReserveNewContainer() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+
+    nm1.registerNode();
+    nm2.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+              10)), null);
+
+      // Reserve new container
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      // Do a node heartbeat again without releasing container from app2
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 2);
+
+      // Finish application 2
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      ContainerId containerId = ContainerId.newContainerId(
+          am2.getApplicationAttemptId(), 1);
+      cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+              .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+          RMContainerEventType.FINISHED);
+
+      // Do a node heartbeat again
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 3);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message