Author: sseth
Date: Fri Aug 31 22:39:10 2012
New Revision: 1379650
URL: http://svn.apache.org/viewvc?rev=1379650&view=rev
Log:
MAPREDUCE-4625. Statistics logging in the AM scheduler.
Modified:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 31
22:39:10 2012
@@ -6,3 +6,5 @@ Branch MR-3902
MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Tsuyoshi OZAWA via sseth)
MAPREDUCE-4609. RMContainerAllocator scheduler interval should be configurable. (Tsuyoshi
OZAWA via sseth)
+
+ MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
Fri Aug 31 22:39:10 2012
@@ -444,6 +444,7 @@ public class MRAppMaster extends Composi
// second timeout before the exit.
// TODO XXX: Modify TaskAttemptCleaner to empty it's queue while stopping.
public void handle(JobFinishEvent event) {
+ LOG.info("Handling JobFinished Event");
AMShutdownRunnable r = new AMShutdownRunnable();
Thread t = new Thread(r, "AMShutdownThread");
t.start();
@@ -506,6 +507,8 @@ public class MRAppMaster extends Composi
@Override
public void run() {
maybeSendJobEndNotification();
+ // TODO XXX Add a timeout.
+ LOG.info("Waiting for all containers and TaskAttempts to complete");
while (!allContainersComplete() || !allTaskAttemptsComplete()) {
try {
synchronized(this) {
@@ -516,6 +519,7 @@ public class MRAppMaster extends Composi
break;
}
}
+ LOG.info("All Containers and TaskAttempts Complete. Stopping services");
stopAM();
LOG.info("AM Shutdown Thread Completing");
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
Fri Aug 31 22:39:10 2012
@@ -25,8 +25,7 @@ public enum TaskAttemptEventType {
//Producer:Task, Speculator
TA_SCHEDULE,
- TA_RESCHEDULE,
-
+
//Producer: TaskAttemptListener
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
Fri Aug 31 22:39:10 2012
@@ -591,7 +591,8 @@ public abstract class TaskImpl implement
++numberUncompletedAttempts;
//schedule the nextAttemptNumber
- eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE,
failedAttempts > 0));
+ eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(),
+ TaskAttemptEventType.TA_SCHEDULE, failedAttempts > 0));
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
Fri Aug 31 22:39:10 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.hadoop.mapreduce.v2.app2.rm;
import java.util.List;
@@ -9,6 +27,10 @@ public class AMSchedulerEventContainersA
private final List<ContainerId> containerIds;
private final boolean headRoomChanged;
+ // TODO XXX: Maybe distinguish between newly allocated containers and
+ // existing containers being re-used.
+ // headRoomChanged is a strange API - making an assumption about how the
+ // scheduler will use this info.
public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
boolean headRoomChanged) {
super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
Fri Aug 31 22:39:10 2012
@@ -6,21 +6,17 @@ public enum AMSchedulerEventType {
S_TA_STOP_REQUEST, // Maybe renamed to S_TA_END / S_TA_ABNORMAL_END
S_TA_SUCCEEDED,
S_TA_ENDED,
-
+
//Producer: RMCommunicator
S_CONTAINERS_ALLOCATED,
-
+
//Producer: Container. (Maybe RMCommunicator)
S_CONTAINER_COMPLETED,
-
- // Add events for nodes being blacklisted.
-
- // TODO XXX
- //Producer: RMCommunicator. May not be needed.
-// S_CONTAINER_COMPLETED,
-
- //Producer: RMComm
-// S_NODE_UNHEALTHY,
-// S_NODE_HEALTHY,
-
+
+ //Producer: Node
+ S_NODE_BLACKLISTED,
+ S_NODE_UNHEALTHY,
+ S_NODE_HEALTHY
+ // The scheduler should have a way of knowing about unusable nodes. Acting on
+ // this information to change requests etc is scheduler specific.
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Fri Aug 31 22:39:10 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
@@ -158,6 +159,8 @@ public class RMContainerAllocator extend
attemptToLaunchRequestMap = new HashMap<TaskAttemptId, AMSchedulerTALaunchRequestEvent>();
private int containersAllocated = 0;
+ private int newContainerAllocations = 0;
+ private int existingContainerAllocations = 0;
private int containersReleased = 0;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
@@ -177,7 +180,6 @@ public class RMContainerAllocator extend
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
- @SuppressWarnings("rawtypes")
public RMContainerAllocator(RMContainerRequestor requestor,
AppContext appContext) {
super("RMContainerAllocator");
@@ -211,7 +213,10 @@ public class RMContainerAllocator extend
MRJobConfig.MR_AM_SCHEDULER_INTERVAL,
MRJobConfig.DEFAULT_MR_AM_SCHEDULER_INTERVAL);
shouldReUse = conf.getBoolean("am.scheduler.shouldReuse", false);
- LOG.info("XXX: ShouldReUse: " + shouldReUse);
+ LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse
+ + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: "
+ + maxReduceRampupLimit + ", maxReducePreemptionLimit: "
+ + maxReducePreemptionLimit);
RackResolver.init(conf);
}
@@ -249,7 +254,8 @@ public class RMContainerAllocator extend
scheduleTimer = new Timer("AMSchedulerTimer", true);
scheduleTimerTask = new ScheduleTimerTask();
- scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval, scheduleInterval);
+ scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval,
+ scheduleInterval);
this.job = appContext.getJob(jobId);
super.start();
@@ -260,10 +266,10 @@ public class RMContainerAllocator extend
this.stopEventHandling = true;
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
- super.stop();
if (scheduleTimerTask != null) {
scheduleTimerTask.stop();
}
+ super.stop();
LOG.info("Final Scheduler Stats: " + getStat());
}
@@ -322,11 +328,10 @@ public class RMContainerAllocator extend
throw new YarnException(e);
}
}
-
- // TODO XXX: Before and after makeRemoteRequest statistics.
-protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
-
+ protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
+
+ LOG.info("XXX: Processing the event " + sEvent.toString());
switch(sEvent.getType()) {
// TODO XXX: recalculateReduceSchedule may need to bet set on other events - not just
containerAllocated.
case S_TA_LAUNCH_REQUEST:
@@ -345,21 +350,22 @@ protected synchronized void handleEvent(
case S_CONTAINERS_ALLOCATED:
handleContainersAllocated((AMSchedulerEventContainersAllocated) sEvent);
break;
- // No HEALTH_CHANGE events. Not modifying the table based on these.
- case S_CONTAINER_COMPLETED: // Maybe use this to reschedule reduces ?
+ case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler.
break;
- // Node State Change Event. May want to withdraw requests related to the node, and put
- // in fresh requests.
-
- // Similarly for the case where a node gets blacklisted.
- default:
+ case S_NODE_BLACKLISTED:
+ // TODO XXX Withdraw requests related to this node and place new ones.
+ break;
+ case S_NODE_UNHEALTHY:
+ // Ignore. RM will not allocated containers on this node.
+ break;
+ case S_NODE_HEALTHY:
+ // Ignore. RM will start allocating containers if there's pending requests.
break;
}
}
private void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
// Add to queue of pending tasks.
- LOG.info("Processing the event " + event.toString());
attemptToLaunchRequestMap.put(event.getAttemptID(), event);
if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
mapResourceReqt = maybeComputeNormalizedRequestForType(event,
@@ -381,10 +387,8 @@ protected synchronized void handleEvent(
}
private void handleTaStopRequest(AMSchedulerTAStopRequestEvent event) {
- LOG.info("Processing the event " + event.toString());
TaskAttemptId aId = event.getAttemptID();
attemptToLaunchRequestMap.remove(aId);
- // XXX Not very efficient. List / check type.
boolean removed = pendingReduces.remove(aId);
if (!removed) {
removed = scheduledRequests.remove(aId);
@@ -393,11 +397,16 @@ protected synchronized void handleEvent(
ContainerId containerId = assignedRequests.getContainerId(aId);
if (containerId != null) {
// Ask the container to stop.
- sendEvent(new AMContainerEvent(containerId, AMContainerEventType.C_STOP_REQUEST));
- // Inform the Node - the task has asked to be STOPPED / has already stopped.
- sendEvent(new AMNodeEventTaskAttemptEnded(containerMap.get(containerId).getContainer().getNodeId(),
containerId, event.getAttemptID(), event.failed()));
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID(), event.failed()));
} else {
- LOG.warn("Received a STOP request for absent taskAttempt: " + event.getAttemptID());
+ LOG.warn("Received a STOP request for absent taskAttempt: "
+ + event.getAttemptID());
}
}
}
@@ -409,33 +418,28 @@ protected synchronized void handleEvent(
// XXX: Maybe send the request to the task before sending it to the scheduler - the scheduler
can then
// query the task to figure out whether the taskAttempt is the successfulAttempt - and
whether to count it towards the reduce ramp up.
// Otherwise -> Job.getCompletedMaps() - will give an out of date picture, since
the scheduler event will always be generated before the TaskCompleted event to the job.
-
- LOG.info("Processing the event " + event.toString());
attemptToLaunchRequestMap.remove(event.getAttemptID());
ContainerId containerId = assignedRequests.remove(event.getAttemptID());
if (containerId != null) { // TODO Should not be null. Confirm.
- sendEvent(new AMContainerTASucceededEvent(containerId, event.getAttemptID()));
- sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap.get(containerId).getContainer().getNodeId(),
containerId, event.getAttemptID()));
+ sendEvent(new AMContainerTASucceededEvent(containerId,
+ event.getAttemptID()));
+ sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID()));
containerAvailable(containerId);
} else {
LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
+ event.getAttemptID() + ". Full event: " + event);
}
}
-
+
// TODO XXX: Deal with node blacklisting.
private void handleContainersAllocated(AMSchedulerEventContainersAllocated event) {
- // TODO XXX: Maybe have an event from the Requestor -> saying AllocationChanged ->
listOfNewContainers, listOfFinishedContainers (finished containers goes to Containers directly,
but should always come in from the RM)
- // TODO XXX
- /*
- * Start allocating containers. Match requests to capabilities.
- * Send out Container_START / Container_TA_ASSIGNED events.
- */
- // TODO XXX: Logging of the assigned containerIds.
- LOG.info("Processing the event " + event.toString());
availableContainerIds.addAll(event.getContainerIds());
+
+
if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
// TODO XXX -> recaulculateReduceSchedule in case of released containers
// .... would imply CONTAINER_COMPLETED messages are required by the Scheduler.
@@ -445,6 +449,7 @@ protected synchronized void handleEvent(
schedule();
}
+ // TODO XXX: Deal with node blacklisting.
// TODO Override for re-use.
@@ -480,6 +485,7 @@ protected synchronized void handleEvent(
private synchronized void schedule() {
assignContainers();
requestContainers();
+ lastScheduleTime = clock.getTime();
}
protected void containerAvailable(ContainerId containerId) {
@@ -493,17 +499,6 @@ protected synchronized void handleEvent(
}
}
- // TODO Override for container re-use.
-// protected void containerAvailable(ContainerId containerId) {
-// // For now releasing the container.
-// // allocatedContainerIds.add(containerId);
-// sendEvent(new AMContainerEvent(containerId,
-// AMContainerEventType.C_STOP_REQUEST));
-// // XXX A release should not be required. Only required when a container
-// // cannot be assigned, or if there's an explicit request to stop the container,
-// // in which case the release request will go out from the container itself.
-// }
-
@SuppressWarnings("unchecked")
private int maybeComputeNormalizedRequestForType(
AMSchedulerTALaunchRequestEvent event, TaskType taskType,
@@ -704,11 +699,14 @@ protected synchronized void handleEvent(
" AssignedReduces:" + assignedRequests.reduces.size() +
" completedMaps:" + getJob().getCompletedMaps() +
" completedReduces:" + getJob().getCompletedReduces() +
- " containersAllocated:" + containersAllocated +
+ " containersAllocated:" + containersAllocated + //Not super useful.
+ " newContainersAllocated: " + newContainerAllocations +
+ " existingContainersAllocated: " + existingContainerAllocations +
" containersReleased:" + containersReleased +
" hostLocalAssigned:" + hostLocalAssigned +
" rackLocalAssigned:" + rackLocalAssigned +
" availableResources(headroom):" + requestor.getAvailableResources();
+ // TODO (Post 3902): Can hostLocal/rackLocal be handled elsewhere.
}
@@ -719,11 +717,17 @@ protected synchronized void handleEvent(
@Private
public int getMemLimit() {
- int headRoom = requestor.getAvailableResources() != null ? requestor.getAvailableResources().getMemory()
: 0;
- return headRoom + assignedRequests.maps.size() * mapResourceReqt +
- assignedRequests.reduces.size() * reduceResourceReqt;
+ int headRoom = requestor.getAvailableResources() != null ? requestor
+ .getAvailableResources().getMemory() : 0;
+ return headRoom + assignedRequests.maps.size() * mapResourceReqt
+ + assignedRequests.reduces.size() * reduceResourceReqt;
}
+
+ /**
+ * Tracks attempts for which a Container ask has been sent to the
+ * RMCommunicator.
+ */
private class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps =
@@ -820,7 +824,13 @@ protected synchronized void handleEvent(
containersAllocated += allocatedContainerIds.size();
while (it.hasNext()) {
ContainerId containerId = it.next();
- Container allocated = containerMap.get(containerId).getContainer();
+ AMContainer amContainer = containerMap.get(containerId);
+ Container allocated = amContainer.getContainer();
+ if (amContainer.getState() == AMContainerState.ALLOCATED) {
+ newContainerAllocations++;
+ } else {
+ existingContainerAllocations++;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated.getId()
+ " with priority " + allocated.getPriority() + " to NM "
@@ -1108,6 +1118,9 @@ protected synchronized void handleEvent(
}
}
+ /**
+ * Tracks TaskAttempts which have been assigned a Container.
+ */
private class AssignedRequests {
private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
@@ -1159,6 +1172,7 @@ protected synchronized void handleEvent(
}
// TODO XXX Check where all this is being used.
+ // XXX: Likely needed in case of TA failed / killed / terminated as well.
// Old code was removing when CONTAINER_COMPLETED was received fromthe RM.
ContainerId remove(TaskAttemptId tId) {
ContainerId containerId = null;
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
Fri Aug 31 22:39:10 2012
@@ -301,9 +301,19 @@ public class RMContainerRequestor extend
}
}
+ private String getStat() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ContainersAllocated: ").append(numContainersAllocated)
+ .append(", ContainersFinished: ").append(numFinishedContainers)
+ .append(", NumContainerReleaseRequests: ")
+ .append(numContainerReleaseRequests);
+ return sb.toString();
+ }
+
@SuppressWarnings("unchecked")
@Override
protected void heartbeat() throws Exception {
+ LOG.info("BeforeHeartbeat: " + getStat());
int headRoom = getAvailableResources() != null ? getAvailableResources()
.getMemory() : 0;// first time it would be null
AMResponse response = errorCheckedMakeRemoteRequest();
@@ -322,6 +332,8 @@ public class RMContainerRequestor extend
List<NodeReport> updatedNodeReports = response.getUpdatedNodes();
logUpdatedNodes(updatedNodeReports);
+ LOG.info("AfterHeartbeat: " + getStat());
+
// Inform the Containers about completion..
for (ContainerStatus c : finishedContainers) {
eventHandler.handle(new AMContainerEventReleased(c));
@@ -420,6 +432,7 @@ public class RMContainerRequestor extend
RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)
rawEvent;
releaseLock.lock();
try {
+ // TODO XXX: Currently the RM does not handle release requests for RUNNING containers.
numContainerReleaseRequests++;
release.add(event.getContainerId());
} finally {
@@ -506,7 +519,7 @@ public class RMContainerRequestor extend
private void logFinishedContainers(List<ContainerStatus> finishedContainers) {
if (finishedContainers.size() > 0) {
- LOG.info(finishedContainers.size() + " finished");
+ LOG.info(finishedContainers.size() + " containers finished");
for (ContainerStatus cs : finishedContainers) {
LOG.info("FinihsedContainer: " + cs);
}
|