Author: sseth
Date: Fri Sep 21 18:07:22 2012
New Revision: 1388591
URL: http://svn.apache.org/viewvc?rev=1388591&view=rev
Log:
MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth)
Added:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
Removed:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerAllocator.java
Modified:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21 18:07:22 2012
@@ -18,3 +18,5 @@ Branch MR-3902
MAPREDUCE-4626. Fix and re-enable RMContainerAllocator unit tests (sseth)
MAPREDUCE-4617. Re-wire AM Recovery (sseth)
+
+ MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Sep 21 18:07:22 2012
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.local;
import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -62,7 +61,6 @@ public class LocalContainerAllocator ext
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
- private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval;
private long retrystartTime;
private String nmHost;
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,556 @@
+/**
+s* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Allocates containers locally. Doesn't allocate a real container;
+ * instead sends an allocated event for all requests.
+ */
+// TODO (Post-3902): Maybe implement this via an InterceptingHandler - like Recovery.
+public class LocalContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+
+ private static final Log LOG =
+ LogFactory.getLog(LocalContainerAllocator.class);
+ private static final File curDir = new File(".");
+
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
+ private final NodeId nodeId;
+ private final int nmHttpPort;
+ private final ContainerId amContainerId;
+ private final TaskUmbilicalProtocol umbilical;
+ private FileContext curFC = null;
+ private final HashSet<File> localizedFiles;
+ private final JobId jobId;
+ private final AppContext appContext;
+ private final TaskAttemptListener taskAttemptListenern;
+ private final RMCommunicator rmCommunicator;
+
+ private BlockingQueue<AMSchedulerEvent> eventQueue =
+ new LinkedBlockingQueue<AMSchedulerEvent>();
+ private Thread eventHandlingThread;
+ private boolean stopEventHandling = false;
+
+ public LocalContainerAllocator(AppContext appContext, JobId jobId,
+ String nmHost, int nmPort, int nmHttpPort, ContainerId cId,
+ TaskUmbilicalProtocol taskUmbilical,
+ TaskAttemptListener taskAttemptListener, RMCommunicator rmComm) {
+ super(LocalContainerAllocator.class.getSimpleName());
+ this.appContext = appContext;
+ this.eventHandler = appContext.getEventHandler();
+ this.nodeId = BuilderUtils.newNodeId(nmHost, nmPort);
+ this.nmHttpPort = nmHttpPort;
+ this.amContainerId = cId;
+ this.umbilical = taskUmbilical;
+ this.jobId = jobId;
+ this.taskAttemptListenern = taskAttemptListener;
+ this.rmCommunicator = rmComm;
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
+ // (TODO/FIXME: pointless to use RPC to talk to self; should create
+ // LocalTaskAttemptListener or similar: implement umbilical protocol
+ // but skip RPC stuff)
+
+ try {
+ curFC = FileContext.getFileContext(curDir.toURI());
+ } catch (UnsupportedFileSystemException ufse) {
+ LOG.error("Local filesystem " + curDir.toURI().toString()
+ + " is unsupported?? (should never happen)");
+ }
+
+ // Save list of files/dirs that are supposed to be present so can delete
+ // any extras created by one task before starting subsequent task. Note
+ // that there's no protection against deleted or renamed localization;
+ // users who do that get what they deserve (and will have to disable
+ // uberization in order to run correctly).
+ File[] curLocalFiles = curDir.listFiles();
+ localizedFiles = new HashSet<File>(curLocalFiles.length);
+ for (int j = 0; j < curLocalFiles.length; ++j) {
+ localizedFiles.add(curLocalFiles[j]);
+ }
+
+ // Relocalization note/future FIXME (per chrisdo, 20110315): At moment,
+ // full localization info is in AppSubmissionContext passed from client to
+ // RM and then to NM for AM-container launch: no difference between AM-
+ // localization and MapTask- or ReduceTask-localization, so can assume all
+ // OK. Longer-term, will need to override uber-AM container-localization
+ // request ("needed resources") with union of regular-AM-resources + task-
+ // resources (and, if maps and reduces ever differ, then union of all three
+ // types), OR will need localizer service/API that uber-AM can request
+ // after running (e.g., "localizeForTask()" or "localizeForMapTask()").
+ }
+
+ @Override
+ public void start() {
+ this.eventHandlingThread = new Thread() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+
+ AMSchedulerEvent event;
+
+ while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = LocalContainerAllocator.this.eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+
+ try {
+ handleEvent(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " to the LocalContainreAllocator", t);
+ // Kill the AM.
+ eventHandler.handle(new JobEvent(jobId, JobEventType.INTERNAL_ERROR));
+ return;
+ }
+ }
+ }
+ };
+ eventHandlingThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e); // FIXME? YarnException is "for runtime exceptions only"
+ }
+ }
+
+ // Alternate Options
+ // - Fake containerIds for each and every container. (AMContaienr uses JvmID instead ?)
+ // - Since single threaded, works very well with the current flow. Normal flow via containers.
+ // - Change container to maintain a list of pending tasks.
+
+ // Currently, AMContainer and AMNode are short-circuited. Scheduler sends
+ // events directly to the task, whcih sends events back to the scheduler.
+
+ private boolean doneWithMaps = false;
+ private int finishedSubMaps = 0;
+
+ @SuppressWarnings("unchecked")
+ void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
+
+ LOG.info("Processing the event: " + event);
+ Container container = BuilderUtils.newContainer(amContainerId, nodeId,
+ this.nodeId.getHost() + ":" + nmHttpPort,
+ Records.newRecord(Resource.class), Records.newRecord(Priority.class),
+ null);
+
+ appContext.getAllContainers().addContainerIfNew(container);
+ appContext.getAllNodes().nodeSeen(nodeId);
+
+ // Register a JVMId, so that the TAL can handle pings etc.
+ WrappedJvmID jvmId = new WrappedJvmID(TypeConverter.fromYarn(jobId), event
+ .getAttemptID().getTaskId().getTaskType() == TaskType.MAP,
+ amContainerId.getId());
+ taskAttemptListenern.registerRunningJvm(jvmId, amContainerId);
+
+
+ if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(event
+ .getAttemptID().getTaskId().getJobId());
+ // TODO Setting OTHER_LOCAL_MAP for now.
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ }
+
+ AMSchedulerTALaunchRequestEvent launchEv = (AMSchedulerTALaunchRequestEvent) event;
+ TaskAttemptId attemptID = launchEv.getTaskAttempt().getID();
+
+ Job job = appContext.getJob(jobId);
+ int numMapTasks = job.getTotalMaps();
+ int numReduceTasks = job.getTotalReduces();
+
+ // YARN (tracking) Task:
+ org.apache.hadoop.mapreduce.v2.app2.job.Task ytask = job.getTask(attemptID
+ .getTaskId());
+ // classic mapred Task:
+ org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+ // There is no port number because we are not really talking to a task
+ // tracker. The shuffle is just done through local files. So the
+ // port number is set to -1 in this case.
+
+ appContext.getEventHandler().handle(
+ new TaskAttemptRemoteStartEvent(attemptID, amContainerId,
+ rmCommunicator.getApplicationAcls(), -1));
+
+ if (numMapTasks == 0) {
+ doneWithMaps = true;
+ }
+
+ try {
+ if (remoteTask.isMapOrReduce())
+ if (attemptID.getTaskId().getTaskType() == TaskType.MAP
+ || attemptID.getTaskId().getTaskType() == TaskType.REDUCE) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID
+ .getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+ if (remoteTask.isMapTask()) {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+ } else {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+ }
+ appContext.getEventHandler().handle(jce);
+ }
+
+ // Register the sub-task with TaskAttemptListener.
+ taskAttemptListenern.registerTaskAttempt(event.getAttemptID(), jvmId);
+ runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+ (numReduceTasks > 0));
+ } catch (RuntimeException re) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID
+ .getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+ appContext.getEventHandler().handle(jce);
+ // this is our signal that the subtask failed in some way, so
+ // simulate a failed JVM/container and send a container-completed
+ // event to task attempt (i.e., move state machine from RUNNING
+ // to FAILED [and ultimately to FAILED])
+
+ // CLEANUP event generated.f
+ appContext.getEventHandler().handle(
+ new TaskAttemptEventTerminated(attemptID));
+
+ } catch (IOException ioe) {
+ // if umbilical itself barfs (in error-handler of runSubMap()),
+ // we're pretty much hosed, so do what YarnChild main() does
+ // (i.e., exit clumsily--but can never happen, so no worries!)
+ LOG.fatal("oopsie... this can never happen: "
+ + StringUtils.stringifyException(ioe));
+ System.exit(-1);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void handleTaStopRequest(AMSchedulerTAStopRequestEvent sEvent) {
+ // Implies a failed or killed task.
+ // This will trigger a CLEANUP event. UberAM is supposed to fail if there's
+ // event a single failed attempt. Hence the CLEANUP is OK (otherwise delay
+ // cleanup till end of job). TODO Enforce job failure on single task attempt
+ // failure.
+ appContext.getEventHandler().handle(
+ new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+ taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void handleTaSucceededRequest(AMSchedulerTASucceededEvent sEvent) {
+ // Successful taskAttempt.
+ // Same CLEANUP comment as handleTaStopRequest
+ appContext.getEventHandler().handle(
+ new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+ taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
+ }
+
+ public void handleEvent(AMSchedulerEvent sEvent) {
+ LOG.info("Processing the event " + sEvent.toString());
+ switch (sEvent.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
+ break;
+ case S_TA_STOP_REQUEST: // Effectively means a failure.
+ handleTaStopRequest((AMSchedulerTAStopRequestEvent) sEvent);
+ break;
+ case S_TA_SUCCEEDED:
+ handleTaSucceededRequest((AMSchedulerTASucceededEvent) sEvent);
+ break;
+ default:
+ LOG.warn("Invalid event type for LocalContainerAllocator: "
+ + sEvent.getType() + ", Event: " + sEvent);
+ }
+ }
+
+ private class SubtaskRunner implements Runnable {
+ SubtaskRunner() {
+ }
+
+ @Override
+ public void run() {
+ }
+
+ private void runSubtask(org.apache.hadoop.mapred.Task task,
+ final TaskType taskType,
+ TaskAttemptId attemptID,
+ final int numMapTasks,
+ boolean renameOutputs)
+ throws RuntimeException, IOException {
+ org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
+ TypeConverter.fromYarn(attemptID);
+
+ try {
+ JobConf conf = new JobConf(getConfig());
+ conf.set(JobContext.TASK_ID, task.getTaskID().toString());
+ conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
+ conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
+ conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
+ conf.set(JobContext.ID, task.getJobID().toString());
+
+ // Use the AM's local dir env to generate the intermediate step
+ // output files
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
+ System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+ conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+ LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ + conf.get(MRConfig.LOCAL_DIR));
+
+ // mark this as an uberized subtask so it can set task counter
+ // (longer-term/FIXME: could redefine as job counter and send
+ // "JobCounterEvent" to JobImpl on [successful] completion of subtask;
+ // will need new Job state-machine transition and JobImpl jobCounters
+ // map to handle)
+ conf.setBoolean("mapreduce.task.uberized", true);
+
+ // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
+ // etc.), or just assume/hope the state machine(s) and uber-AM work
+ // as expected?
+ if (taskType == TaskType.MAP) {
+ if (doneWithMaps) {
+ LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ + attemptID + "), but should be finished with maps");
+ throw new RuntimeException();
+ }
+
+ MapTask map = (MapTask)task;
+ map.setConf(conf);
+
+ map.run(conf, umbilical);
+
+ if (renameOutputs) {
+ renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
+ }
+ relocalize();
+
+ if (++finishedSubMaps == numMapTasks) {
+ doneWithMaps = true;
+ }
+
+ } else /* TaskType.REDUCE */ {
+
+ if (!doneWithMaps) {
+ // check if event-queue empty? whole idea of counting maps vs.
+ // checking event queue is a tad wacky...but could enforce ordering
+ // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
+ // doesn't send reduce event until maps all done]
+ LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ + attemptID + "), but not yet finished with maps");
+ throw new RuntimeException();
+ }
+
+ // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
+ // set framework name to local to make task local
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+ conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
+
+ ReduceTask reduce = (ReduceTask)task;
+ reduce.setConf(conf);
+
+ reduce.run(conf, umbilical);
+ //relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
+ }
+
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
+ umbilical.fsError(classicAttemptID, e.getMessage());
+ throw new RuntimeException();
+
+ } catch (Exception exception) {
+ LOG.warn("Exception running local (uberized) 'child' : "
+ + StringUtils.stringifyException(exception));
+ try {
+ if (task != null) {
+ // do cleanup for the task
+ task.taskCleanup(umbilical);
+ }
+ } catch (Exception e) {
+ LOG.info("Exception cleaning up: "
+ + StringUtils.stringifyException(e));
+ }
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ exception.printStackTrace(new PrintStream(baos));
+ umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
+ throw new RuntimeException();
+
+ } catch (Throwable throwable) {
+ LOG.fatal("Error running local (uberized) 'child' : "
+ + StringUtils.stringifyException(throwable));
+ Throwable tCause = throwable.getCause();
+ String cause = (tCause == null)
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(classicAttemptID, cause);
+ throw new RuntimeException();
+ }
+ }
+
+ /**
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
+ * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
+ * to something unique (e.g., "map_0.out") to avoid collisions.
+ *
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
+ * filenames instead of "file.out". (All of this is entirely internal,
+ * so there are no particular compatibility issues.)
+ */
+ private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
+ MapOutputFile subMapOutputFile)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming map output file for task attempt "
+ + mapId.toString() + " from original location " + mapOut.toString()
+ + " to destination " + reduceIn.toString());
+ }
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ }
+
+ /**
+ * Also within the local filesystem, we need to restore the initial state
+ * of the directory as much as possible. Compare current contents against
+ * the saved original state and nuke everything that doesn't belong, with
+ * the exception of the renamed map outputs.
+ *
+ * Any jobs that go out of their way to rename or delete things from the
+ * local directory are considered broken and deserve what they get...
+ */
+ private void relocalize() {
+ File[] curLocalFiles = curDir.listFiles();
+ for (int j = 0; j < curLocalFiles.length; ++j) {
+ if (!localizedFiles.contains(curLocalFiles[j])) {
+ // found one that wasn't there before: delete it
+ boolean deleted = false;
+ try {
+ if (curFC != null) {
+ // this is recursive, unlike File delete():
+ deleted = curFC.delete(new Path(curLocalFiles[j].getName()),true);
+ }
+ } catch (IOException e) {
+ deleted = false;
+ }
+ if (!deleted) {
+ LOG.warn("Unable to delete unexpected local file/dir "
+ + curLocalFiles[j].getName() + ": insufficient permissions?");
+ }
+ }
+ }
+ }
+
+ } // end SubtaskRunner
+
+
+ // _must_ either run subtasks sequentially or accept expense of new JVMs
+ // (i.e., fork()), else will get weird failures when maps try to create/
+ // write same dirname or filename: no chdir() in Java
+
+ // TODO: Since this is only a single thread, AMContainer can be used
+ // efficiently for the Uber lifecycle. Even if it is made multi-threaded, it
+ // may make sense to let AMContainer maintain a queue of pending tasks,
+ // instead of only a single pending task.
+
+ /**
+ * Blocking call. Only one attempt runs at any given point.
+ */
+ private void runSubtask(org.apache.hadoop.mapred.Task remoteTask,
+ final TaskType taskType,
+ TaskAttemptId attemptID,
+ final int numMapTasks,
+ boolean renameOutputs)
+ throws RuntimeException, IOException {
+ SubtaskRunner subTaskRunner = new SubtaskRunner();
+ subTaskRunner.runSubtask(remoteTask, taskType, attemptID, numMapTasks,
+ renameOutputs);
+ }
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Fri Sep 21 18:07:22 2012
@@ -92,11 +92,11 @@ public class TaskAttemptListenerImpl2 ex
private Server server;
// TODO XXX: Use this to figure out whether an incoming ping is valid.
- private ConcurrentMap<TaskAttemptID, WrappedJvmID>
- jvmIDToActiveAttemptMap
- = new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
+ private ConcurrentMap<TaskAttemptID, WrappedJvmID> attemptToJvmIdMap =
+ new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
// jvmIdToContainerIdMap also serving to check whether the container is still running.
- private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap = new ConcurrentHashMap<WrappedJvmID, ContainerId>();
+ private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap =
+ new ConcurrentHashMap<WrappedJvmID, ContainerId>();
// private Set<WrappedJvmID> launchedJVMs = Collections
// .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@@ -162,7 +162,7 @@ public class TaskAttemptListenerImpl2 ex
}
private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) {
- containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(jvmIDToActiveAttemptMap.get(attemptID)));
+ containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(attemptToJvmIdMap.get(attemptID)));
}
/**
@@ -494,14 +494,14 @@ public class TaskAttemptListenerImpl2 ex
}
public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) {
- jvmIDToActiveAttemptMap.put(TypeConverter.fromYarn(attemptId), jvmId);
+ attemptToJvmIdMap.put(TypeConverter.fromYarn(attemptId), jvmId);
}
// Unregister called by the Container. Registration happens when TAL asks
// the container for a task.
@Override
public void unregisterTaskAttempt(TaskAttemptId attemptId) {
- jvmIDToActiveAttemptMap.remove(TypeConverter.fromYarn(attemptId));
+ attemptToJvmIdMap.remove(TypeConverter.fromYarn(attemptId));
}
public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java Fri Sep 21 18:07:22 2012
@@ -5,9 +5,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
-import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
public class ContainerHeartbeatHandler extends
HeartbeatHandlerBase<ContainerId> {
@@ -30,14 +28,14 @@ public class ContainerHeartbeatHandler e
}
@Override
- public boolean hasTimedOut(ReportTime report, long currentTime) {
+ protected boolean hasTimedOut(ReportTime report, long currentTime) {
return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
}
@SuppressWarnings("unchecked")
@Override
- public void handleTimeOut(ContainerId containerId) {
+ protected void handleTimeOut(ContainerId containerId) {
eventHandler.handle(new AMContainerEvent(containerId,
AMContainerEventType.C_TIMED_OUT));
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java Fri Sep 21 18:07:22 2012
@@ -26,16 +26,17 @@ public abstract class HeartbeatHandlerBa
private ConcurrentMap<T, ReportTime> runningMap;
private volatile boolean stopped;
-
- public HeartbeatHandlerBase(AppContext appContext,int numThreads, String name) {
+
+ public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
super(name);
this.name = name;
this.eventHandler = appContext.getEventHandler();
this.clock = appContext.getClock();
this.appContext = appContext;
- this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>();
+ this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
+ 16, 0.75f, numThreads);
}
-
+
@Override
public void init(Configuration conf) {
super.init(conf);
@@ -55,7 +56,9 @@ public abstract class HeartbeatHandlerBa
@Override
public void stop() {
stopped = true;
- timeOutCheckerThread.interrupt();
+ if (timeOutCheckerThread != null) {
+ timeOutCheckerThread.interrupt();
+ }
super.stop();
}
@@ -115,9 +118,9 @@ public abstract class HeartbeatHandlerBa
}
}
- public abstract boolean hasTimedOut(ReportTime report, long currentTime);
+ protected abstract boolean hasTimedOut(ReportTime report, long currentTime);
- public abstract void handleTimeOut(T t);
+ protected abstract void handleTimeOut(T t);
private class PingChecker implements Runnable {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 21 18:07:22 2012
@@ -36,7 +36,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LocalContainerAllocator;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl2;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -68,13 +70,15 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.local.LocalContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
@@ -176,7 +180,7 @@ public class MRAppMaster extends Composi
private JobHistoryEventHandler2 jobHistoryEventHandler;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
- private RMContainerRequestor rmContainerRequestor;
+ private ContainerRequestor containerRequestor;
private ContainerAllocator amScheduler;
@@ -310,26 +314,9 @@ public class MRAppMaster extends Composi
dispatcher.register(Speculator.EventType.class,
speculatorEventDispatcher);
- // service to allocate containers from RM (if non-uber) or to fake it (uber)
- rmContainerRequestor = createRMContainerRequestor(clientService, context);
- addIfService(rmContainerRequestor);
- dispatcher.register(RMCommunicatorEventType.class, rmContainerRequestor);
-
- // TODO XXX: Get rid of eventHandlers being sent as part of the constructors. context should be adequate.
- amScheduler = createAMScheduler(rmContainerRequestor, context);
- addIfService(amScheduler);
- dispatcher.register(AMSchedulerEventType.class, amScheduler);
-
-// containerAllocator = createContainerAllocator(clientService, context);
-// addIfService(containerAllocator);
-// dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-
- // TODO XXX: initialization of RMComm, Scheduler, etc.
-
-
- // TODO XXX: Rename to NMComm
- // corresponding service to launch allocated containers via NodeManager
-// containerLauncher = createNMCommunicator(context);
+ // TODO XXX: Rename to NMComm
+ // corresponding service to launch allocated containers via NodeManager
+ // containerLauncher = createNMCommunicator(context);
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
@@ -508,17 +495,21 @@ public class MRAppMaster extends Composi
maybeSendJobEndNotification();
// TODO XXX Add a timeout.
LOG.info("Waiting for all containers and TaskAttempts to complete");
- while (!allContainersComplete() || !allTaskAttemptsComplete()) {
- try {
- synchronized(this) {
- wait(100l);
+ if (!job.isUber()) {
+ while (!allContainersComplete() || !allTaskAttemptsComplete()) {
+ try {
+ synchronized (this) {
+ wait(100l);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("AM Shutdown Thread interrupted. Exiting");
+ break;
}
- } catch (InterruptedException e) {
- LOG.info("AM Shutdown Thread interrupted. Exiting");
- break;
}
+ LOG.info("All Containers and TaskAttempts Complete. Stopping services");
+ } else {
+ LOG.info("Uberized job. Not waiting for all containers to finish");
}
- LOG.info("All Containers and TaskAttempts Complete. Stopping services");
stopAM();
LOG.info("AM Shutdown Thread Completing");
}
@@ -593,20 +584,37 @@ public class MRAppMaster extends Composi
* @param appContext the application context.
* @return an instance of the RMContainerRequestor.
*/
- protected RMContainerRequestor createRMContainerRequestor(
+ protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
- return new RMContainerRequestor(clientService, appContext);
+ ContainerRequestor containerRequestor;
+ if (job.isUber()) {
+ containerRequestor = new LocalContainerRequestor(clientService,
+ appContext);
+ } else {
+ containerRequestor = new RMContainerRequestor(clientService, appContext);
+ }
+ return containerRequestor;
}
-
+
/**
* Create the AM Scheduler.
- * @param requestor The RM Container Requestor.
+ *
+ * @param requestor The Container Requestor.
* @param appContext the application context.
* @return an instance of the AMScheduler.
*/
- protected ContainerAllocator createAMScheduler(
- RMContainerRequestor requestor, AppContext appContext) {
- return new RMContainerAllocator(requestor, appContext);
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ if (job.isUber()) {
+ return new LocalContainerAllocator(appContext, jobId, nmHost, nmPort,
+ nmHttpPort, containerID, (TaskUmbilicalProtocol) taskAttemptListener,
+ taskAttemptListener, (RMCommunicator)containerRequestor);
+ } else {
+ // TODO XXX: This is terrible. Assuming RMContainerRequestor is sent in
+ // when non-uberized. Fix RMContainerRequestor to be a proper interface, etc.
+ return new RMContainerAllocator((RMContainerRequestor) requestor,
+ appContext);
+ }
}
/** Create and initialize (but don't start) a single job. */
@@ -727,32 +735,25 @@ public class MRAppMaster extends Composi
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
return thh;
}
-
+
protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
Configuration conf) {
ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+ // TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
return chh;
}
+
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleanerImpl(context);
}
-// protected ContainerAllocator createContainerAllocator(
-// final ClientService clientService, final AppContext context) {
-// return new ContainerAllocatorRouter(clientService, context);
-// }
-
protected ContainerLauncher
createContainerLauncher(final AppContext context) {
- return new ContainerLauncherRouter(context);
+ return new ContainerLauncherImpl(context);
}
-
-// protected ContainerLauncher createNMCommunicator(final AppContext context) {
-// return new ContainerLauncherImpl(context);
-// }
//TODO:should have an interface for MRClientService
protected ClientService createClientService(AppContext context) {
@@ -797,12 +798,8 @@ public class MRAppMaster extends Composi
public List<AMInfo> getAllAMInfos() {
return amInfos;
- }
-
-// public ContainerAllocator getContainerAllocator() {
-// return containerAllocator;
-// }
-
+ }
+
public ContainerLauncher getContainerLauncher() {
return containerLauncher;
}
@@ -815,96 +812,6 @@ public class MRAppMaster extends Composi
return taskHeartbeatHandler;
}
- /**
- * By the time life-cycle of this router starts, job-init would have already
- * happened.
- */
-// private final class ContainerAllocatorRouter extends AbstractService
-// implements ContainerAllocator {
-// private final ClientService clientService;
-// private final AppContext context;
-// private ContainerAllocator containerAllocator;
-//
-// ContainerAllocatorRouter(ClientService clientService,
-// AppContext context) {
-// super(ContainerAllocatorRouter.class.getName());
-// this.clientService = clientService;
-// this.context = context;
-// }
-//
-// @Override
-// public synchronized void start() {
-// if (job.isUber()) {
-// this.containerAllocator = new LocalContainerAllocator(
-// this.clientService, this.context, nmHost, nmPort, nmHttpPort
-// , containerID);
-// } else {
-//
-// // TODO XXX UberAM
-//// this.containerAllocator = new RMContainerAllocator(
-//// this.clientService, this.context);
-// }
-// ((Service)this.containerAllocator).init(getConfig());
-// ((Service)this.containerAllocator).start();
-// super.start();
-// }
-//
-// @Override
-// public synchronized void stop() {
-// ((Service)this.containerAllocator).stop();
-// super.stop();
-// }
-//
-// @Override
-// public void handle(ContainerAllocatorEvent event) {
-// this.containerAllocator.handle(event);
-// }
-//
-// public void setSignalled(boolean isSignalled) {
-// ((RMCommunicator) containerAllocator).setSignalled(true);
-// }
-// }
-
- /**
- * By the time life-cycle of this router starts, job-init would have already
- * happened.
- */
- private final class ContainerLauncherRouter extends AbstractService
- implements ContainerLauncher {
- private final AppContext context;
- private ContainerLauncher containerLauncher;
-
- ContainerLauncherRouter(AppContext context) {
- super(ContainerLauncherRouter.class.getName());
- this.context = context;
- }
-
- @Override
- public synchronized void start() {
- if (job.isUber()) {
- // TODO XXX: Handle uber.
-// this.containerLauncher = new LocalContainerLauncher(context,
-// (TaskUmbilicalProtocol) taskAttemptListener);
- } else {
- this.containerLauncher = new ContainerLauncherImpl(context);
- }
- ((Service)this.containerLauncher).init(getConfig());
- ((Service)this.containerLauncher).start();
- super.start();
- }
-
- @Override
- public void handle(NMCommunicatorEvent event) {
- this.containerLauncher.handle(event);
- }
-
- @Override
- public synchronized void stop() {
- ((Service)this.containerLauncher).stop();
- super.stop();
- }
- }
-
private final class StagingDirCleaningService extends AbstractService {
StagingDirCleaningService() {
super(StagingDirCleaningService.class.getName());
@@ -1055,6 +962,16 @@ public class MRAppMaster extends Composi
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerRequestor = createContainerRequestor(clientService, context);
+ addIfService(containerRequestor);
+ ((Service)containerRequestor).init(getConfig());
+ dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+ amScheduler = createAMScheduler(containerRequestor, context);
+ addIfService(amScheduler);
+ ((Service)amScheduler).init(getConfig());
+ dispatcher.register(AMSchedulerEventType.class, amScheduler);
//start all the components
super.start();
@@ -1223,11 +1140,11 @@ public class MRAppMaster extends Composi
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
- // that they don't take too long in shutting down
-// if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
-// ((ContainerAllocatorRouter) appMaster.containerAllocator)
-// .setSignalled(true);
-// }
+ // that they don't take too long in shutting down
+
+ // Signal the RMCommunicator.
+ ((RMCommunicator)appMaster.containerRequestor).setSignalled(true);
+
if(appMaster.jobHistoryEventHandler != null) {
appMaster.jobHistoryEventHandler.setSignalled(true);
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java Fri Sep 21 18:07:22 2012
@@ -52,14 +52,14 @@ public class TaskHeartbeatHandler extend
}
@Override
- public boolean hasTimedOut(
+ protected boolean hasTimedOut(
org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase.ReportTime report,
long currentTime) {
return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
}
@Override
- public void handleTimeOut(TaskAttemptId attemptId) {
+ protected void handleTimeOut(TaskAttemptId attemptId) {
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
"AttemptID:" + attemptId.toString()
+ " Timed out after " + timeOut / 1000 + " secs"));
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java Fri Sep 21 18:07:22 2012
@@ -1,10 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.hadoop.mapreduce.v2.app2.job.event;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class TaskAttemptRemoteStartEvent extends TaskAttemptEvent {
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.local;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+/**
+ * For UberAM, the LocalContainerRequestor is responsible for sending keep-alive
+ * heartbeats to the RM, along with sending over job progress. Also provides any
+ * additional information to the rest of the AM - ApplicationACLs etc.
+ */
+public class LocalContainerRequestor extends RMCommunicator implements
+ ContainerRequestor {
+
+ private static final Log LOG =
+ LogFactory.getLog(LocalContainerRequestor.class);
+
+ private long retrystartTime;
+ private long retryInterval;
+
+ public LocalContainerRequestor(ClientService clientService, AppContext context) {
+ super(clientService, context);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ retryInterval =
+ getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ // Init startTime to current time. If all goes well, it will be reset after
+ // first attempt to contact RM.
+ retrystartTime = System.currentTimeMillis();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void heartbeat() throws Exception {
+ AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+ this.applicationAttemptId, this.lastResponseID, super
+ .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+ AMResponse response;
+ try {
+ response = allocateResponse.getAMResponse();
+ // Reset retry count if no exception occurred.
+ retrystartTime = System.currentTimeMillis();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
+ if (response.getReboot()) {
+ LOG.info("Event from RM: shutting down Application Master");
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+ this.getContext().getApplicationID());
+ }
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent rawEvent) {
+ switch (rawEvent.getType()) {
+ case CONTAINER_DEALLOCATE:
+ LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+ + rawEvent);
+ break;
+ case CONTAINER_FAILED:
+ LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+ + rawEvent);
+ break;
+ case CONTAINER_REQ:
+ LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+ + rawEvent);
+ break;
+ default:
+ break;
+ }
+ }
+}
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
+
+
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Fri Sep 21 18:07:22 2012
@@ -82,7 +82,7 @@ public abstract class RMCommunicator ext
protected volatile boolean isSignalled = false;
public RMCommunicator(ClientService clientService, AppContext context) {
- super("RMCommunicator");
+ super(RMCommunicator.class.getSimpleName());
this.clientService = clientService;
this.context = context;
this.eventHandler = context.getEventHandler();
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -61,14 +60,13 @@ import org.apache.hadoop.yarn.util.Recor
* Keeps the data structures to send container requests to RM.
*/
// TODO XXX: Eventually rename to RMCommunicator
-public class RMContainerRequestor extends RMCommunicator implements EventHandler<RMCommunicatorEvent> {
+public class RMContainerRequestor extends RMCommunicator implements ContainerRequestor {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
static final String ANY = "*";
private final Clock clock;
- private int lastResponseID;
private Resource availableResources; // aka headroom.
private long retrystartTime;
private long retryInterval;
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 21 18:07:22 2012
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
@@ -81,8 +82,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
@@ -547,7 +548,7 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected RMContainerRequestor createRMContainerRequestor(
+ protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
return new MRAppContainerRequestor(clientService, appContext);
}
@@ -592,9 +593,9 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected ContainerAllocator createAMScheduler(
- RMContainerRequestor requestor, AppContext appContext) {
- return new MRAppAMScheduler();
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ return new MRAppAMScheduler();
}
protected class MRAppAMScheduler implements ContainerAllocator {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java Fri Sep 21 18:07:22 2012
@@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -140,7 +140,7 @@ import org.junit.Test;
}
@Override
- protected RMContainerRequestor createRMContainerRequestor(
+ protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
return new TestCleanupContainerRequestor(clientService, appContext);
}
|