Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 14103 invoked from network); 17 Mar 2011 20:23:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Mar 2011 20:23:04 -0000 Received: (qmail 62992 invoked by uid 500); 17 Mar 2011 20:23:04 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 62948 invoked by uid 500); 17 Mar 2011 20:23:04 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 62848 invoked by uid 99); 17 Mar 2011 20:23:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2011 20:23:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2011 20:22:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EDB792388C10; Thu, 17 Mar 2011 20:22:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1082677 [9/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc... Date: Thu, 17 Mar 2011 20:21:54 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110317202207.EDB792388C10@eris.apache.org> Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,417 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; +import org.apache.hadoop.yarn.AMRMProtocol; +import org.apache.hadoop.yarn.AMResponse; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ApplicationMaster; +import org.apache.hadoop.yarn.ApplicationStatus; +import org.apache.hadoop.yarn.Container; +import org.apache.hadoop.yarn.ContainerID; +import org.apache.hadoop.yarn.NodeID; +import org.apache.hadoop.yarn.Resource; +import org.apache.hadoop.yarn.ResourceRequest; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID; +import org.junit.Test; + +public class TestRMContainerAllocator { + private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class); + + @Test + public void testSimple() throws Exception { + FifoScheduler scheduler = createScheduler(); + LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( + scheduler); + + //add resources to scheduler + NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240); + NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240); + NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240); + + //create the container request + ContainerRequestEvent event1 = + createReq(1, 1024, 1, new String[]{"h1"}); + allocator.sendRequest(event1); + + //send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(2, 1024, 1, new String[]{"h2"}); + allocator.sendRequest(event2); + + //this tells the scheduler about the requests + //as nodes are not added, no allocations + List assigned = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //send another request with different resource and priority + ContainerRequestEvent event3 = createReq(3, 1024, 1, new String[]{"h3"}); + allocator.sendRequest(event3); + + //this tells the scheduler about the requests + //as nodes are not added, no allocations + assigned = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //update resources in scheduler + scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat + + + assigned = allocator.schedule(); + checkAssignments( + new ContainerRequestEvent[]{event1, event2, event3}, assigned, false); + } + + //TODO: Currently Scheduler seems to have bug where it does not work + //for Application asking for containers with different capabilities. + //@Test + public void testResource() throws Exception { + FifoScheduler scheduler = createScheduler(); + LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( + scheduler); + + //add resources to scheduler + NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240); + NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240); + NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240); + + //create the container request + ContainerRequestEvent event1 = + createReq(1, 1024, 1, new String[]{"h1"}); + allocator.sendRequest(event1); + + //send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(2, 2048, 1, new String[]{"h2"}); + allocator.sendRequest(event2); + + //this tells the scheduler about the requests + //as nodes are not added, no allocations + List assigned = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //update resources in scheduler + scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat + + assigned = allocator.schedule(); + checkAssignments( + new ContainerRequestEvent[]{event1, event2}, assigned, false); + } + + @Test + public void testPriority() throws Exception { + FifoScheduler scheduler = createScheduler(); + LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( + scheduler); + + //add resources to scheduler + NodeInfo nodeManager1 = addNode(scheduler, "h1", 1024); + NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240); + NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240); + + //create the container request + ContainerRequestEvent event1 = + createReq(1, 2048, 1, new String[]{"h1", "h2"}); + allocator.sendRequest(event1); + + //send 1 more request with different priority + ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"}); + allocator.sendRequest(event2); + + //send 1 more request with different priority + ContainerRequestEvent event3 = createReq(3, 2048, 3, new String[]{"h3"}); + allocator.sendRequest(event3); + + //this tells the scheduler about the requests + //as nodes are not added, no allocations + List assigned = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + //update resources in scheduler + scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat + scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat + + assigned = allocator.schedule(); + checkAssignments( + new ContainerRequestEvent[]{event1, event2, event3}, assigned, false); + + //validate that no container is assigned to h1 as it doesn't have 2048 + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertFalse("Assigned count not correct", + "h1".equals(assig.getContainerManagerAddress())); + } + } + + + + private NodeInfo addNode(FifoScheduler scheduler, + String nodeName, int memory) { + NodeID nodeId = new NodeID(); + nodeId.id = 0; + Resource resource = new Resource(); + resource.memory = memory; + NodeInfo nodeManager = scheduler.addNode(nodeId, nodeName, + RMResourceTrackerImpl.resolve(nodeName), resource); // Node registration + return nodeManager; + } + + private FifoScheduler createScheduler() throws AvroRemoteException { + FifoScheduler fsc = new FifoScheduler(new Configuration(), + new ContainerTokenSecretManager()) { + //override this to copy the objects + //otherwise FifoScheduler updates the numContainers in same objects as kept by + //RMContainerAllocator + @Override + public synchronized List allocate(ApplicationID applicationId, + List ask, List release) + throws IOException { + List askCopy = new ArrayList(); + for (ResourceRequest req : ask) { + ResourceRequest reqCopy = new ResourceRequest(); + reqCopy.priority = req.priority; + reqCopy.hostName = req.hostName; + reqCopy.capability = req.capability; + reqCopy.numContainers = req.numContainers; + askCopy.add(reqCopy); + } + //no need to copy release + return super.allocate(applicationId, askCopy, release); + } + }; + try { + fsc.addApplication(new ApplicationID(), "test", null, null); + } catch(IOException ie) { + LOG.info("add application failed with ", ie); + assert(false); + } + return fsc; + } + + private ContainerRequestEvent createReq( + int attemptid, int memory, int priority, String[] hosts) { + TaskAttemptID attemptId = new TaskAttemptID(); + attemptId.id = attemptid; + Resource containerNeed = new Resource(); + containerNeed.memory = memory; + return new ContainerRequestEvent(attemptId, + containerNeed, priority, + hosts, new String[] {NetworkTopology.DEFAULT_RACK}); + } + + private void checkAssignments(ContainerRequestEvent[] requests, + List assignments, + boolean checkHostMatch) { + Assert.assertNotNull("Container not assigned", assignments); + Assert.assertEquals("Assigned count not correct", + requests.length, assignments.size()); + + //check for uniqueness of containerIDs + Set containerIds = new HashSet(); + for (TaskAttemptContainerAssignedEvent assigned : assignments) { + containerIds.add(assigned.getContainerID()); + } + Assert.assertEquals("Assigned containers must be different", + assignments.size(), containerIds.size()); + + //check for all assignment + for (ContainerRequestEvent req : requests) { + TaskAttemptContainerAssignedEvent assigned = null; + for (TaskAttemptContainerAssignedEvent ass : assignments) { + if (ass.getTaskAttemptID().equals(req.getAttemptID())){ + assigned = ass; + break; + } + } + checkAssignment(req, assigned, checkHostMatch); + } + } + + private void checkAssignment(ContainerRequestEvent request, + TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { + Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(), + assigned); + Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), + assigned.getTaskAttemptID()); + if (checkHostMatch) { + Assert.assertTrue("Not assigned to requested host", Arrays.asList + (request.getHosts()).contains(assigned.getContainerManagerAddress())); + } + + } + + //Mock RMContainerAllocator + //Instead of talking to remote Scheduler,uses the local Scheduler + public static class LocalRMContainerAllocator extends RMContainerAllocator { + private static final List events = + new ArrayList(); + + public static class AMRMProtocolImpl implements AMRMProtocol { + + private ResourceScheduler resourceScheduler; + + public AMRMProtocolImpl(ResourceScheduler resourceScheduler) { + this.resourceScheduler = resourceScheduler; + } + + @Override + public Void registerApplicationMaster( + ApplicationMaster applicationMaster) throws AvroRemoteException { + return null; + } + + @Override + public AMResponse allocate(ApplicationStatus status, + List ask, List release) + throws AvroRemoteException { + try { + AMResponse response = new AMResponse(); + response.containers = resourceScheduler.allocate(status.applicationId, ask, release); + return response; + } catch(IOException ie) { + throw RPCUtil.getRemoteException(ie); + } + } + + @Override + public Void finishApplicationMaster(ApplicationMaster applicationMaster) + throws AvroRemoteException { + // TODO Auto-generated method stub + return null; + } + + } + + private ResourceScheduler scheduler; + LocalRMContainerAllocator(ResourceScheduler scheduler) { + super(null, new TestContext(events)); + this.scheduler = scheduler; + super.init(new Configuration()); + super.start(); + } + + protected AMRMProtocol createSchedulerProxy() { + return new AMRMProtocolImpl(scheduler); + } + + @Override + protected void register() {} + @Override + protected void unregister() {} + + public void sendRequest(ContainerRequestEvent req) { + sendRequests(Arrays.asList(new ContainerRequestEvent[]{req})); + } + + public void sendRequests(List reqs) { + for (ContainerRequestEvent req : reqs) { + handle(req); + } + } + + //API to be used by tests + public List schedule() { + //run the scheduler + try { + allocate(); + } catch (Exception e) { + + } + + List result = new ArrayList(events); + events.clear(); + return result; + } + + protected void startAllocatorThread() { + //override to NOT start thread + } + + static class TestContext implements AppContext { + private List events; + TestContext(List events) { + this.events = events; + } + @Override + public Map getAllJobs() { + return null; + } + @Override + public ApplicationID getApplicationID() { + return new ApplicationID(); + } + @Override + public EventHandler getEventHandler() { + return new EventHandler() { + @Override + public void handle(Event event) { + events.add((TaskAttemptContainerAssignedEvent) event); + } + }; + } + @Override + public Job getJob(JobID jobID) { + return null; + } + + @Override + public CharSequence getUser() { + return null; + } + } + } + + public static void main(String[] args) throws Exception { + TestRMContainerAllocator t = new TestRMContainerAllocator(); + t.testSimple(); + //t.testResource(); + t.testPriority(); + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,757 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.Clock; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; +import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ContainerID; +import org.apache.hadoop.mapreduce.v2.api.Counters; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.JobReport; +import org.apache.hadoop.mapreduce.v2.api.JobState; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.TaskID; +import org.apache.hadoop.mapreduce.v2.api.TaskReport; +import org.apache.hadoop.mapreduce.v2.api.TaskState; +import org.apache.hadoop.mapreduce.v2.api.TaskType; +import org.junit.Assert; +import org.junit.Test; + +public class TestRuntimeEstimators { + + private static int INITIAL_NUMBER_FREE_SLOTS = 300; + private static int MAP_SLOT_REQUIREMENT = 3; + // this has to be at least as much as map slot requirement + private static int REDUCE_SLOT_REQUIREMENT = 4; + private static int MAP_TASKS = 200; + private static int REDUCE_TASKS = 150; + + private Queue taskEvents; + + Clock clock; + + Job myJob; + + AppContext myAppContext; + + private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class); + + private final AtomicInteger slotsInUse = new AtomicInteger(0); + + Dispatcher dispatcher; + + DefaultSpeculator speculator; + + TaskRuntimeEstimator estimator; + + // This is a huge kluge. The real implementations have a decent approach + private final AtomicInteger completedMaps = new AtomicInteger(0); + private final AtomicInteger completedReduces = new AtomicInteger(0); + + private final AtomicInteger successfulSpeculations + = new AtomicInteger(0); + private final AtomicLong taskTimeSavedBySpeculation + = new AtomicLong(0L); + + private void coreTestEstimator + (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) { + estimator = testedEstimator; + taskEvents = new ConcurrentLinkedQueue(); + myJob = null; + slotsInUse.set(0); + completedMaps.set(0); + completedReduces.set(0); + successfulSpeculations.set(0); + taskTimeSavedBySpeculation.set(0); + + ((MockClock)clock).advanceTime(1000); + + Configuration conf = new Configuration(); + + myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS); + myJob = myAppContext.getAllJobs().values().iterator().next(); + + estimator.contextualize(conf, myAppContext); + + speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock); + + dispatcher.register(Speculator.EventType.class, speculator); + + dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler()); + + ((AsyncDispatcher)dispatcher).init(conf); + ((AsyncDispatcher)dispatcher).start(); + + + + speculator.init(conf); + speculator.start(); + + // Now that the plumbing is hooked up, we do the following: + // do until all tasks are finished, ... + // 1: If we have spare capacity, assign as many map tasks as we can, then + // assign as many reduce tasks as we can. Note that an odd reduce + // task might be started while there are still map tasks, because + // map tasks take 3 slots and reduce tasks 2 slots. + // 2: Send a speculation event for every task attempt that's running + // note that new attempts might get started by the speculator + + // discover undone tasks + int undoneMaps = MAP_TASKS; + int undoneReduces = REDUCE_TASKS; + + // build a task sequence where all the maps precede any of the reduces + List allTasksSequence = new LinkedList(); + + allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values()); + allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values()); + + while (undoneMaps + undoneReduces > 0) { + undoneMaps = 0; undoneReduces = 0; + // start all attempts which are new but for which there is enough slots + for (Task task : allTasksSequence) { + if (!task.isFinished()) { + if (task.getType() == TaskType.MAP) { + ++undoneMaps; + } else { + ++undoneReduces; + } + } + for (TaskAttempt attempt : task.getAttempts().values()) { + if (attempt.getState() == TaskAttemptState.NEW + && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get() + >= taskTypeSlots(task.getType())) { + MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt; + SpeculatorEvent event + = new SpeculatorEvent(attempt.getID(), false, clock.getTime()); + speculator.handle(event); + attemptImpl.startUp(); + } else { + // If a task attempt is in progress we should send the news to + // the Speculator. + TaskAttemptStatus status = new TaskAttemptStatus(); + status.id = attempt.getID(); + status.progress = attempt.getProgress(); + status.stateString = attempt.getState().name(); + SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime()); + speculator.handle(event); + } + } + } + + long startTime = System.currentTimeMillis(); + + // drain the speculator event queue + while (!speculator.eventQueueEmpty()) { + Thread.yield(); + if (System.currentTimeMillis() > startTime + 130000) { + return; + } + } + + ((MockClock) clock).advanceTime(1000L); + + if (clock.getTime() % 10000L == 0L) { + speculator.scanForSpeculations(); + } + } + + Assert.assertEquals("We got the wrong number of successful speculations.", + expectedSpeculations, successfulSpeculations.get()); + } + + @Test + public void testLegacyEstimator() throws Exception { + clock = new MockClock(); + TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator(); + Configuration conf = new Configuration(); + dispatcher = new AsyncDispatcher(); + myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS); + + coreTestEstimator(specificEstimator, 3); + } + + @Test + public void testExponentialEstimator() throws Exception { + clock = new MockClock(); + TaskRuntimeEstimator specificEstimator + = new ExponentiallySmoothedTaskRuntimeEstimator(); + Configuration conf = new Configuration(); + dispatcher = new AsyncDispatcher(); + myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS); + + coreTestEstimator(new LegacyTaskRuntimeEstimator(), 3); + } + + int taskTypeSlots(TaskType type) { + return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT; + } + + private boolean jobComplete() { + for (Task task : myJob.getTasks().values()) { + if (!task.isFinished()) { + return false; + } + } + + return true; + } + + private int slotsInUse(int mapSize, int reduceSize) { + return slotsInUse.get(); + } + + class SpeculationRequestEventHandler implements EventHandler { + + @Override + public void handle(TaskEvent event) { + TaskID taskID = event.getTaskID(); + Task task = myJob.getTask(taskID); + + Assert.assertEquals + ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType()); + + System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID); + + addAttempt(task); + } + } + + void addAttempt(Task task) { + MyTaskImpl myTask = (MyTaskImpl) task; + + myTask.addAttempt(); + } + + class MyTaskImpl implements Task { + private final TaskID taskID; + private final Map attempts + = new HashMap(4); + + MyTaskImpl(JobID jobID, int index, TaskType type) { + taskID = new TaskID(); + taskID.id = index; + taskID.taskType = type; + taskID.jobID = jobID; + } + + void addAttempt() { + TaskAttempt taskAttempt + = new MyTaskAttemptImpl(taskID, attempts.size(), clock); + TaskAttemptID taskAttemptID = taskAttempt.getID(); + + attempts.put(taskAttemptID, taskAttempt); + + System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID()); + + SpeculatorEvent event = new SpeculatorEvent(taskID, +1); + dispatcher.getEventHandler().handle(event); + } + + @Override + public TaskID getID() { + return taskID; + } + + @Override + public TaskReport getReport() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Counters getCounters() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public float getProgress() { + float result = 0.0F; + + + for (TaskAttempt attempt : attempts.values()) { + result = Math.max(result, attempt.getProgress()); + } + + return result; + } + + @Override + public TaskType getType() { + return taskID.taskType; + } + + @Override + public Map getAttempts() { + Map result + = new HashMap(attempts.size()); + result.putAll(attempts); + return result; + } + + @Override + public TaskAttempt getAttempt(TaskAttemptID attemptID) { + return attempts.get(attemptID); + } + + @Override + public boolean isFinished() { + for (TaskAttempt attempt : attempts.values()) { + if (attempt.getState() == TaskAttemptState.SUCCEEDED) { + return true; + } + } + + return false; + } + + @Override + public boolean canCommit(TaskAttemptID taskAttemptID) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public TaskState getState() { + throw new UnsupportedOperationException("Not supported yet."); + } + + } + + class MyJobImpl implements Job { + private final JobID jobID; + private final Map allTasks = new HashMap(); + private final Map mapTasks = new HashMap(); + private final Map reduceTasks = new HashMap(); + + MyJobImpl(JobID jobID, int numMaps, int numReduces) { + this.jobID = jobID; + for (int i = 0; i < numMaps; ++i) { + Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP); + mapTasks.put(newTask.getID(), newTask); + allTasks.put(newTask.getID(), newTask); + } + for (int i = 0; i < numReduces; ++i) { + Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE); + reduceTasks.put(newTask.getID(), newTask); + allTasks.put(newTask.getID(), newTask); + } + + // give every task an attempt + for (Task task : allTasks.values()) { + MyTaskImpl myTaskImpl = (MyTaskImpl) task; + myTaskImpl.addAttempt(); + } + } + + @Override + public JobID getID() { + return jobID; + } + + @Override + public JobState getState() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public JobReport getReport() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Counters getCounters() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Map getTasks() { + return allTasks; + } + + @Override + public Map getTasks(TaskType taskType) { + return taskType == TaskType.MAP ? mapTasks : reduceTasks; + } + + @Override + public Task getTask(TaskID taskID) { + return allTasks.get(taskID); + } + + @Override + public List getDiagnostics() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getCompletedMaps() { + return completedMaps.get(); + } + + @Override + public int getCompletedReduces() { + return completedReduces.get(); + } + + @Override + public TaskAttemptCompletionEvent[] + getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public CharSequence getName() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getTotalMaps() { + return mapTasks.size(); + } + + @Override + public int getTotalReduces() { + return reduceTasks.size(); + } + } + + /* + * We follow the pattern of the real XxxImpl . We create a job and initialize + * it with a full suite of tasks which in turn have one attempt each in the + * NEW state. Attempts transition only from NEW to RUNNING to SUCCEEDED . + */ + class MyTaskAttemptImpl implements TaskAttempt { + private final TaskAttemptID myAttemptID; + + long startMockTime = Long.MIN_VALUE; + + long shuffleCompletedTime = Long.MAX_VALUE; + + TaskAttemptState overridingState = TaskAttemptState.NEW; + + MyTaskAttemptImpl(TaskID taskID, int index, Clock clock) { + myAttemptID = new TaskAttemptID(); + myAttemptID.id = index; + myAttemptID.taskID = taskID; + } + + void startUp() { + startMockTime = clock.getTime(); + overridingState = null; + + slotsInUse.addAndGet(taskTypeSlots(myAttemptID.taskID.taskType)); + + System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID()); + + SpeculatorEvent event = new SpeculatorEvent(getID().taskID, -1); + dispatcher.getEventHandler().handle(event); + } + + @Override + public TaskAttemptID getID() { + return myAttemptID; + } + + @Override + public TaskAttemptReport getReport() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public List getDiagnostics() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Counters getCounters() { + throw new UnsupportedOperationException("Not supported yet."); + } + + private float getCodeRuntime() { + int taskIndex = myAttemptID.taskID.id; + int attemptIndex = myAttemptID.id; + + float result = 200.0F; + + switch (taskIndex % 4) { + case 0: + if (taskIndex % 40 == 0 && attemptIndex == 0) { + result = 600.0F; + break; + } + + break; + case 2: + break; + + case 1: + result = 150.0F; + break; + + case 3: + result = 250.0F; + break; + } + + return result; + } + + private float getMapProgress() { + float runtime = getCodeRuntime(); + + return Math.min + ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F); + } + + private float getReduceProgress() { + Job job = myAppContext.getJob(myAttemptID.taskID.jobID); + float runtime = getCodeRuntime(); + + Collection allMapTasks = job.getTasks(TaskType.MAP).values(); + + int numberMaps = allMapTasks.size(); + int numberDoneMaps = 0; + + for (Task mapTask : allMapTasks) { + if (mapTask.isFinished()) { + ++numberDoneMaps; + } + } + + if (numberMaps == numberDoneMaps) { + shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime()); + + return Math.min + ((float) (clock.getTime() - shuffleCompletedTime) + / (runtime * 2000.0F) + 0.5F, + 1.0F); + } else { + return ((float) numberDoneMaps) / numberMaps * 0.5F; + } + } + + // we compute progress from time and an algorithm now + @Override + public float getProgress() { + if (overridingState == TaskAttemptState.NEW) { + return 0.0F; + } + return myAttemptID.taskID.taskType == TaskType.MAP ? getMapProgress() : getReduceProgress(); + } + + @Override + public TaskAttemptState getState() { + if (overridingState != null) { + return overridingState; + } + TaskAttemptState result + = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED; + + if (result == TaskAttemptState.SUCCEEDED) { + overridingState = TaskAttemptState.SUCCEEDED; + + System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished."); + + slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType)); + + (myAttemptID.taskID.taskType == TaskType.MAP + ? completedMaps : completedReduces).getAndIncrement(); + + // check for a spectacularly successful speculation + TaskID taskID = myAttemptID.taskID; + Task undoneTask = null; + + Task task = myJob.getTask(taskID); + + for (TaskAttempt otherAttempt : task.getAttempts().values()) { + if (otherAttempt != this + && otherAttempt.getState() == TaskAttemptState.RUNNING) { + // we had two instances running. Try to determine how much + // we might have saved by speculation + if (getID().id > otherAttempt.getID().id) { + // the speculation won + successfulSpeculations.getAndIncrement(); + float hisProgress = otherAttempt.getProgress(); + long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime; + System.out.println("TLTRE:A speculation finished at time " + + clock.getTime() + + ". The stalled attempt is at " + (hisProgress * 100.0) + + "% progress, and it started at " + + hisStartTime + ", which is " + + (clock.getTime() - hisStartTime) + " ago."); + long originalTaskEndEstimate + = (hisStartTime + + estimator.estimatedRuntime(otherAttempt.getID())); + System.out.println( + "TLTRE: We would have expected the original attempt to take " + + estimator.estimatedRuntime(otherAttempt.getID()) + + ", finishing at " + originalTaskEndEstimate); + long estimatedSavings = originalTaskEndEstimate - clock.getTime(); + taskTimeSavedBySpeculation.addAndGet(estimatedSavings); + System.out.println("TLTRE: The task is " + task.getID()); + slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType)); + ((MyTaskAttemptImpl)otherAttempt).overridingState + = TaskAttemptState.KILLED; + } else { + System.out.println( + "TLTRE: The normal attempt beat the speculation in " + + task.getID()); + } + } + } + } + + return result; + } + + @Override + public boolean isFinished() { + return getProgress() == 1.0F; + } + + @Override + public ContainerID getAssignedContainerID() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public String getAssignedContainerMgrAddress() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long getLaunchTime() { + return startMockTime; + } + + @Override + public long getFinishTime() { + throw new UnsupportedOperationException("Not supported yet."); + } + } + + static class MockClock extends Clock { + private long currentTime = 0; + + @Override + long getMeasuredTime() { + return currentTime; + } + + void setMeasuredTime(long newTime) { + currentTime = newTime; + } + + void advanceTime(long increment) { + currentTime += increment; + } + } + + class MyAppMaster extends CompositeService { + final Clock clock; + public MyAppMaster(Clock clock) { + super(MyAppMaster.class.getName()); + if (clock == null) { + clock = new Clock(); + } + this.clock = clock; + LOG.info("Created MyAppMaster"); + } + } + + class MyAppContext implements AppContext { + // I'll be making Avro objects by hand. Please don't do that very often. + + private final ApplicationID myApplicationID; + private final JobID myJobID; + private final Map allJobs; + + MyAppContext(int numberMaps, int numberReduces) { + myApplicationID = new ApplicationID(); + myApplicationID.clusterTimeStamp = clock.getTime(); + myApplicationID.id = 1; + + myJobID = new JobID(); + myJobID.appID = myApplicationID; + + Job myJob + = new MyJobImpl(myJobID, numberMaps, numberReduces); + + allJobs = Collections.singletonMap(myJobID, myJob); + } + + @Override + public ApplicationID getApplicationID() { + return myApplicationID; + } + + @Override + public Job getJob(JobID jobID) { + return allJobs.get(jobID); + } + + @Override + public Map getAllJobs() { + return allJobs; + } + + @Override + public EventHandler getEventHandler() { + return dispatcher.getEventHandler(); + } + + @Override + public CharSequence getUser() { + throw new UnsupportedOperationException("Not supported yet."); + } + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.webapp; + +import com.google.inject.Injector; +import java.util.Map; + +import org.junit.Test; +import static org.junit.Assert.*; + +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MockJobs; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp; +import org.apache.hadoop.mapreduce.v2.app.webapp.AppController; +import org.apache.hadoop.mapreduce.v2.app.webapp.AppView; +import org.apache.hadoop.mapreduce.v2.app.webapp.JobPage; +import org.apache.hadoop.mapreduce.v2.app.webapp.TaskPage; +import org.apache.hadoop.mapreduce.v2.app.webapp.TasksPage; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.test.WebAppTests; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.mapreduce.v2.api.JobID; + +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; + +public class TestAMWebApp { + + static class TestAppContext implements AppContext { + final ApplicationID appID; + final String user = MockJobs.newUserName(); + final Map jobs; + + TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) { + appID = MockJobs.newAppID(appid); + jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts); + } + + TestAppContext() { + this(0, 1, 1, 1); + } + + @Override + public ApplicationID getApplicationID() { + return appID; + } + + @Override + public CharSequence getUser() { + return user; + } + + @Override + public Job getJob(JobID jobID) { + return jobs.get(jobID); + } + + @Override + public Map getAllJobs() { + return jobs; // OK + } + + @Override + public EventHandler getEventHandler() { + return null; + } + } + + @Test public void testAppControllerIndex() { + TestAppContext ctx = new TestAppContext(); + Injector injector = WebAppTests.createMockInjector(AppContext.class, ctx); + AppController controller = injector.getInstance(AppController.class); + controller.index(); + assertEquals(Apps.toString(ctx.appID), controller.get(APP_ID,"")); + } + + @Test public void testAppView() { + WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext()); + } + + @Test public void testJobView() { + WebAppTests.testPage(JobPage.class, AppContext.class, new TestAppContext()); + } + + @Test public void testTasksView() { + WebAppTests.testPage(TasksPage.class, AppContext.class, + new TestAppContext()); + } + + @Test public void testTaskView() { + WebAppTests.testPage(TaskPage.class, AppContext.class, + new TestAppContext()); + } + + public static void main(String[] args) { + WebApps.$for("yarn", AppContext.class, new TestAppContext(0, 8, 88, 4)). + at(58888).inDevMode().start(new AMWebApp()).joinThread(); + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml Thu Mar 17 20:21:13 2011 @@ -0,0 +1,63 @@ + + + hadoop-mapreduce-client + org.apache.hadoop + ${yarn.version} + + 4.0.0 + org.apache.hadoop + hadoop-mapreduce-client-common + hadoop-mapreduce-client-common + ${yarn.version} + http://maven.apache.org + + + + + org.apache.hadoop + yarn + ${yarn.version} + pom + + + org.apache.hadoop + yarn-api + ${yarn.version} + + + org.apache.hadoop + yarn-server + ${yarn.version} + pom + + + + org.apache.hadoop + yarn-common + ${yarn.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${yarn.version} + + + + + + + org.apache.avro + avro-maven-plugin + 1.4.0-SNAPSHOT + + + generate-sources + + compile + + + + + + + Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro Thu Mar 17 20:21:13 2011 @@ -0,0 +1,151 @@ +@namespace("org.apache.hadoop.mapreduce.v2.api") +protocol MRClientProtocol { + + import idl "./yarn/yarn-api/src/main/avro/yarn-types.genavro"; + + enum TaskType { + MAP, + REDUCE + } + + record JobID { + org.apache.hadoop.yarn.ApplicationID appID; + int id; + } + + record TaskID { + JobID jobID; + TaskType taskType; + int id; + } + + record TaskAttemptID { + TaskID taskID; + int id; + } + + enum TaskState { + NEW, + SCHEDULED, + RUNNING, + SUCCEEDED, + FAILED, + KILL_WAIT, + KILLED + } + + enum Phase { + STARTING, + MAP, + SHUFFLE, + SORT, + REDUCE, + CLEANUP + } + + record Counter { + string name; + string displayName; + long value; + } + + record CounterGroup { + string name; + string displayname; + map counters; + } + + record Counters { + map groups; + } + + record TaskReport { + TaskID id; + TaskState state; + float progress; + long startTime; + long finishTime; + Counters counters; + array runningAttempts; + union{TaskAttemptID, null} successfulAttempt; + array diagnostics; + } + + enum TaskAttemptState { + NEW, + UNASSIGNED, + ASSIGNED, + RUNNING, + COMMIT_PENDING, + SUCCESS_CONTAINER_CLEANUP, + SUCCEEDED, + FAIL_CONTAINER_CLEANUP, + FAIL_TASK_CLEANUP, + FAILED, + KILL_CONTAINER_CLEANUP, + KILL_TASK_CLEANUP, + KILLED + } + + record TaskAttemptReport { + TaskAttemptID id; + TaskAttemptState state; + float progress; + long startTime; + long finishTime; + Counters counters; + string diagnosticInfo; + string stateString; + Phase phase; + } + + enum JobState { + NEW, + RUNNING, + SUCCEEDED, + FAILED, + KILL_WAIT, + KILLED, + ERROR + } + + record JobReport { + JobID id; + JobState state; + float mapProgress; + float reduceProgress; + float cleanupProgress; + float setupProgress; + long startTime; + long finishTime; + } + + enum TaskAttemptCompletionEventStatus { + FAILED, + KILLED, + SUCCEEDED, + OBSOLETE, + TIPFAILED + } + + record TaskAttemptCompletionEvent { + TaskAttemptID attemptId; + TaskAttemptCompletionEventStatus status; + string mapOutputServerAddress; + int attemptRunTime; + int eventId; + } + + JobReport getJobReport(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException; + TaskReport getTaskReport(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException; + TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException; + Counters getCounters(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException; + array getTaskAttemptCompletionEvents(JobID jobID, int fromEventId, int maxEvents) throws org.apache.hadoop.yarn.YarnRemoteException; + array getTaskReports(JobID jobID, TaskType taskType) throws org.apache.hadoop.yarn.YarnRemoteException; + array getDiagnostics(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException; + + void killJob(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException; + void killTask(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException; + void killTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException; + +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class YarnMRJobConfig { + public static final String SPECULATOR_CLASS + = "yarn.mapreduce.job.speculator.class"; + public static final String TASK_RUNTIME_ESTIMATOR_CLASS + = "yarn.mapreduce.job.task.runtime.estimator.class"; + public static final String TASK_ATTEMPT_PROGRESS_RUNTIME_LINEARIZER_CLASS + = "yarn.mapreduce.job.task.runtime.linearizer.class"; + public static final String EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS + = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda"; + public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE + = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate"; +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,308 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.lib; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.mapred.JobPriority; +import org.apache.hadoop.mapred.TIPStatus; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.mapreduce.v2.api.Counter; +import org.apache.hadoop.mapreduce.v2.api.CounterGroup; +import org.apache.hadoop.mapreduce.v2.api.Counters; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.JobReport; +import org.apache.hadoop.mapreduce.v2.api.JobState; +import org.apache.hadoop.mapreduce.v2.api.Phase; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus; +import org.apache.hadoop.mapreduce.v2.api.TaskID; +import org.apache.hadoop.mapreduce.v2.api.TaskState; +import org.apache.hadoop.mapreduce.v2.api.TaskType; + +public class TypeConverter { + + public static org.apache.hadoop.mapred.JobID fromYarn(JobID id) { + String identifier = fromClusterTimeStamp(id.appID.clusterTimeStamp); + return new org.apache.hadoop.mapred.JobID(identifier, id.id); + } + + //currently there is 1-1 mapping between appid and jobid + public static org.apache.hadoop.mapreduce.JobID fromYarn(ApplicationID appID) { + String identifier = fromClusterTimeStamp(appID.clusterTimeStamp); + return new org.apache.hadoop.mapred.JobID(identifier, appID.id); + } + + public static JobID toYarn(org.apache.hadoop.mapreduce.JobID id) { + JobID jobID = new JobID(); + jobID.id = id.getId(); //currently there is 1-1 mapping between appid and jobid + jobID.appID = new ApplicationID(); + jobID.appID.id = id.getId(); + jobID.appID.clusterTimeStamp = toClusterTimeStamp(id.getJtIdentifier()); + return jobID; + } + + private static String fromClusterTimeStamp(long clusterTimeStamp) { + return Long.toString(clusterTimeStamp); + } + + private static long toClusterTimeStamp(String identifier) { + return Long.parseLong(identifier); + } + + public static org.apache.hadoop.mapreduce.TaskType fromYarn( + TaskType taskType) { + switch (taskType) { + case MAP: + return org.apache.hadoop.mapreduce.TaskType.MAP; + case REDUCE: + return org.apache.hadoop.mapreduce.TaskType.REDUCE; + default: + throw new YarnException("Unrecognized task type: " + taskType); + } + } + + public static TaskType + toYarn(org.apache.hadoop.mapreduce.TaskType taskType) { + switch (taskType) { + case MAP: + return TaskType.MAP; + case REDUCE: + return TaskType.REDUCE; + default: + throw new YarnException("Unrecognized task type: " + taskType); + } + } + + public static org.apache.hadoop.mapred.TaskID fromYarn(TaskID id) { + return new org.apache.hadoop.mapred.TaskID(fromYarn(id.jobID), fromYarn(id.taskType), + id.id); + } + + public static TaskID toYarn(org.apache.hadoop.mapreduce.TaskID id) { + TaskID taskID = new TaskID(); + taskID.id = id.getId(); + taskID.taskType = toYarn(id.getTaskType()); + taskID.jobID = toYarn(id.getJobID()); + return taskID; + } + + public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) { + switch (phase) { + case STARTING: + return Phase.STARTING; + case MAP: + return Phase.MAP; + case SHUFFLE: + return Phase.SHUFFLE; + case SORT: + return Phase.SORT; + case REDUCE: + return Phase.REDUCE; + case CLEANUP: + return Phase.CLEANUP; + } + throw new YarnException("Unrecognized Phase: " + phase); + } + + public static TaskCompletionEvent[] fromYarn( + org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] newEvents) { + TaskCompletionEvent[] oldEvents = + new TaskCompletionEvent[newEvents.length]; + int i = 0; + for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent + : newEvents) { + oldEvents[i++] = fromYarn(newEvent); + } + return oldEvents; + } + + public static TaskCompletionEvent fromYarn( + org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent) { + return new TaskCompletionEvent(newEvent.eventId, + fromYarn(newEvent.attemptId), newEvent.attemptId.id, + newEvent.attemptId.taskID.taskType.equals(TaskType.MAP), + fromYarn(newEvent.status), + newEvent.mapOutputServerAddress.toString()); + } + + public static TaskCompletionEvent.Status fromYarn( + TaskAttemptCompletionEventStatus newStatus) { + switch (newStatus) { + case FAILED: + return TaskCompletionEvent.Status.FAILED; + case KILLED: + return TaskCompletionEvent.Status.KILLED; + case OBSOLETE: + return TaskCompletionEvent.Status.OBSOLETE; + case SUCCEEDED: + return TaskCompletionEvent.Status.SUCCEEDED; + case TIPFAILED: + return TaskCompletionEvent.Status.TIPFAILED; + } + throw new YarnException("Unrecognized status: " + newStatus); + } + + public static org.apache.hadoop.mapred.TaskAttemptID fromYarn( + TaskAttemptID id) { + return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.taskID), + id.id); + } + + public static TaskAttemptID toYarn( + org.apache.hadoop.mapred.TaskAttemptID id) { + TaskAttemptID taskAttemptID = new TaskAttemptID(); + taskAttemptID.taskID = toYarn(id.getTaskID()); + taskAttemptID.id = id.getId(); + return taskAttemptID; + } + + public static TaskAttemptID toYarn( + org.apache.hadoop.mapreduce.TaskAttemptID id) { + TaskAttemptID taskAttemptID = new TaskAttemptID(); + taskAttemptID.taskID = toYarn(id.getTaskID()); + taskAttemptID.id = id.getId(); + return taskAttemptID; + } + + public static org.apache.hadoop.mapreduce.Counters fromYarn( + Counters yCntrs) { + org.apache.hadoop.mapreduce.Counters counters = + new org.apache.hadoop.mapreduce.Counters(); + for (CounterGroup yGrp : yCntrs.groups.values()) { + for (Counter yCntr : yGrp.counters.values()) { + org.apache.hadoop.mapreduce.Counter c = + counters.findCounter(yGrp.displayname.toString(), + yCntr.displayName.toString()); + c.setValue(yCntr.value); + } + } + return counters; + } + + public static Counters toYarn(org.apache.hadoop.mapred.Counters counters) { + Counters yCntrs = new Counters(); + yCntrs.groups = new HashMap(); + for (org.apache.hadoop.mapred.Counters.Group grp : counters) { + CounterGroup yGrp = new CounterGroup(); + yGrp.name = grp.getName(); + yGrp.displayname = grp.getDisplayName(); + yGrp.counters = new HashMap(); + for (org.apache.hadoop.mapred.Counters.Counter cntr : grp) { + Counter yCntr = new Counter(); + yCntr.name = cntr.getName(); + yCntr.displayName = cntr.getDisplayName(); + yCntr.value = cntr.getValue(); + yGrp.counters.put(yCntr.name, yCntr); + } + yCntrs.groups.put(yGrp.name, yGrp); + } + return yCntrs; + } + + public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) { + Counters yCntrs = new Counters(); + yCntrs.groups = new HashMap(); + for (org.apache.hadoop.mapreduce.CounterGroup grp : counters) { + CounterGroup yGrp = new CounterGroup(); + yGrp.name = grp.getName(); + yGrp.displayname = grp.getDisplayName(); + yGrp.counters = new HashMap(); + for (org.apache.hadoop.mapreduce.Counter cntr : grp) { + Counter yCntr = new Counter(); + yCntr.name = cntr.getName(); + yCntr.displayName = cntr.getDisplayName(); + yCntr.value = cntr.getValue(); + yGrp.counters.put(yCntr.name, yCntr); + } + yCntrs.groups.put(yGrp.name, yGrp); + } + return yCntrs; + } + + public static org.apache.hadoop.mapred.JobStatus fromYarn( + JobReport jobreport, String jobFile, String trackingUrl) { + String user = null, jobName = null; + JobPriority jobPriority = JobPriority.NORMAL; + return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.id), + jobreport.setupProgress, jobreport.mapProgress, + jobreport.reduceProgress, jobreport.cleanupProgress, + fromYarn(jobreport.state), + jobPriority, user, jobName, jobFile, trackingUrl); + } + + public static int fromYarn(JobState state) { + switch (state) { + case NEW: + return org.apache.hadoop.mapred.JobStatus.PREP; + case RUNNING: + return org.apache.hadoop.mapred.JobStatus.RUNNING; + case KILL_WAIT: + case KILLED: + return org.apache.hadoop.mapred.JobStatus.KILLED; + case SUCCEEDED: + return org.apache.hadoop.mapred.JobStatus.SUCCEEDED; + case FAILED: + case ERROR: + return org.apache.hadoop.mapred.JobStatus.FAILED; + } + throw new YarnException("Unrecognized job state: " + state); + } + + public static org.apache.hadoop.mapred.TIPStatus fromYarn( + TaskState state) { + switch (state) { + case NEW: + case SCHEDULED: + return org.apache.hadoop.mapred.TIPStatus.PENDING; + case RUNNING: + return org.apache.hadoop.mapred.TIPStatus.RUNNING; + case KILL_WAIT: + case KILLED: + return org.apache.hadoop.mapred.TIPStatus.KILLED; + case SUCCEEDED: + return org.apache.hadoop.mapred.TIPStatus.COMPLETE; + case FAILED: + return org.apache.hadoop.mapred.TIPStatus.FAILED; + } + throw new YarnException("Unrecognized task state: " + state); + } + + public static TaskReport fromYarn(org.apache.hadoop.mapreduce.v2.api.TaskReport report) { + return new TaskReport(fromYarn(report.id), report.progress, report.state.toString(), + (String[]) report.diagnostics.toArray(), fromYarn(report.state), report.startTime, report.finishTime, + fromYarn(report.counters)); + } + + public static List fromYarn( + List taskReports) { + List reports = new ArrayList(); + for (org.apache.hadoop.mapreduce.v2.api.TaskReport r : taskReports) { + reports.add(fromYarn(r)); + } + return reports; + } +} + Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,165 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID; +import org.apache.hadoop.mapreduce.v2.api.TaskID; +import org.apache.hadoop.mapreduce.v2.api.TaskType; + +import static org.apache.hadoop.yarn.util.StringHelper.*; + +/** + * Helper class for MR applications + */ +public class MRApps extends Apps { + public static final String JOB = "job"; + public static final String TASK = "task"; + public static final String ATTEMPT = "attempt"; + + public static String toString(JobID jid) { + return _join(JOB, jid.appID.clusterTimeStamp, jid.appID.id, jid.id); + } + + public static JobID toJobID(String jid) { + Iterator it = _split(jid).iterator(); + return toJobID(JOB, jid, it); + } + + // mostly useful for parsing task/attempt id like strings + public static JobID toJobID(String prefix, String s, Iterator it) { + ApplicationID appID = toAppID(prefix, s, it); + shouldHaveNext(prefix, s, it); + JobID jobID = new JobID(); + jobID.appID = appID; + jobID.id = Integer.parseInt(it.next()); + return jobID; + } + + public static String toString(TaskID tid) { + return _join("task", tid.jobID.appID.clusterTimeStamp, tid.jobID.appID.id, + tid.jobID.id, taskSymbol(tid.taskType), tid.id); + } + + public static TaskID toTaskID(String tid) { + Iterator it = _split(tid).iterator(); + return toTaskID(TASK, tid, it); + } + + public static TaskID toTaskID(String prefix, String s, Iterator it) { + JobID jid = toJobID(prefix, s, it); + shouldHaveNext(prefix, s, it); + TaskID tid = new TaskID(); + tid.jobID = jid; + tid.taskType = taskType(it.next()); + shouldHaveNext(prefix, s, it); + tid.id = Integer.parseInt(it.next()); + return tid; + } + + public static String toString(TaskAttemptID taid) { + return _join("attempt", taid.taskID.jobID.appID.clusterTimeStamp, + taid.taskID.jobID.appID.id, taid.taskID.jobID.id, + taskSymbol(taid.taskID.taskType), taid.taskID.id, taid.id); + } + + public static TaskAttemptID toTaskAttemptID(String taid) { + Iterator it = _split(taid).iterator(); + TaskID tid = toTaskID(ATTEMPT, taid, it); + shouldHaveNext(ATTEMPT, taid, it); + TaskAttemptID taID = new TaskAttemptID(); + taID.taskID = tid; + taID.id = Integer.parseInt(it.next()); + return taID; + } + + public static String taskSymbol(TaskType type) { + switch (type) { + case MAP: return "m"; + case REDUCE: return "r"; + } + throw new YarnException("Unknown task type: "+ type.toString()); + } + + public static TaskType taskType(String symbol) { + // JDK 7 supports switch on strings + if (symbol.equals("m")) return TaskType.MAP; + if (symbol.equals("r")) return TaskType.REDUCE; + throw new YarnException("Unknown task symbol: "+ symbol); + } + + public static void setInitialClasspath( + Map environment) throws IOException { + + // Get yarn mapreduce-app classpath from generated classpath + // Works if compile time env is same as runtime. For e.g. tests. + InputStream classpathFileStream = + Thread.currentThread().getContextClassLoader() + .getResourceAsStream("mrapp-generated-classpath"); + BufferedReader reader = + new BufferedReader(new InputStreamReader(classpathFileStream)); + addToClassPath(environment, reader.readLine().trim()); + + // If runtime env is different. + if (System.getenv().get("YARN_HOME") != null) { + ShellCommandExecutor exec = + new ShellCommandExecutor(new String[] { + System.getenv().get("YARN_HOME") + "/bin/yarn", + "classpath" }); + exec.execute(); + addToClassPath(environment, exec.getOutput().trim()); + } + + // Get yarn mapreduce-app classpath + if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) { + ShellCommandExecutor exec = + new ShellCommandExecutor(new String[] { + System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred", + "classpath" }); + exec.execute(); + addToClassPath(environment, exec.getOutput().trim()); + } + + // TODO: Remove duplicates. + } + + public static void addToClassPath( + Map environment, String fileName) { + CharSequence classpath = environment.get(CLASSPATH); + if (classpath == null) { + classpath = fileName; + } else { + classpath = classpath + ":" + fileName; + } + environment.put(CLASSPATH, classpath); + } + + public static final String CLASSPATH = "CLASSPATH"; +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,102 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.util; + +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.TaskID; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID; +import org.apache.hadoop.mapreduce.v2.api.TaskType; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestMRApps { + + @Test public void testJobIDtoString() { + JobID jid = new JobID(); + jid.appID = new ApplicationID(); + assertEquals("job_0_0_0", MRApps.toString(jid)); + } + + @Test public void testToJobID() { + JobID jid = MRApps.toJobID("job_1_1_1"); + assertEquals(1, jid.appID.clusterTimeStamp); + assertEquals(1, jid.appID.id); + assertEquals(1, jid.id); + } + + @Test(expected=YarnException.class) public void testJobIDShort() { + MRApps.toJobID("job_0_0"); + } + + @Test public void testTaskIDtoString() { + TaskID tid = new TaskID(); + tid.jobID = new JobID(); + tid.jobID.appID = new ApplicationID(); + tid.taskType = TaskType.MAP; + assertEquals("task_0_0_0_m_0", MRApps.toString(tid)); + tid.taskType = TaskType.REDUCE; + assertEquals("task_0_0_0_r_0", MRApps.toString(tid)); + } + + @Test public void testToTaskID() { + TaskID tid = MRApps.toTaskID("task_1_2_3_r_4"); + assertEquals(1, tid.jobID.appID.clusterTimeStamp); + assertEquals(2, tid.jobID.appID.id); + assertEquals(3, tid.jobID.id); + assertEquals(TaskType.REDUCE, tid.taskType); + assertEquals(4, tid.id); + + tid = MRApps.toTaskID("task_1_2_3_m_4"); + assertEquals(TaskType.MAP, tid.taskType); + } + + @Test(expected=YarnException.class) public void testTaskIDShort() { + MRApps.toTaskID("task_0_0_0_m"); + } + + @Test(expected=YarnException.class) public void testTaskIDBadType() { + MRApps.toTaskID("task_0_0_0_x_0"); + } + + @Test public void testTaskAttemptIDtoString() { + TaskAttemptID taid = new TaskAttemptID(); + taid.taskID = new TaskID(); + taid.taskID.taskType = TaskType.MAP; + taid.taskID.jobID = new JobID(); + taid.taskID.jobID.appID = new ApplicationID(); + assertEquals("attempt_0_0_0_m_0_0", MRApps.toString(taid)); + } + + @Test public void testToTaskAttemptID() { + TaskAttemptID taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4"); + assertEquals(0, taid.taskID.jobID.appID.clusterTimeStamp); + assertEquals(1, taid.taskID.jobID.appID.id); + assertEquals(2, taid.taskID.jobID.id); + assertEquals(3, taid.taskID.id); + assertEquals(4, taid.id); + } + + @Test(expected=YarnException.class) public void testTaskAttemptIDShort() { + MRApps.toTaskAttemptID("attempt_0_0_0_m_0"); + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml Thu Mar 17 20:21:13 2011 @@ -0,0 +1,36 @@ + + + hadoop-mapreduce-client + org.apache.hadoop + ${yarn.version} + + 4.0.0 + org.apache.hadoop + hadoop-mapreduce-client-core + hadoop-mapreduce-client + ${yarn.version} + http://maven.apache.org + + + 1.0-SNAPSHOT + + + + + + org.apache.avro + avro-maven-plugin + 1.4.0-SNAPSHOT + + + generate-sources + + compile + + + + + + + + Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:776175-785643 Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml Thu Mar 17 20:21:13 2011 @@ -0,0 +1,29 @@ + + + + + + default + + + running + + + + \ No newline at end of file Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java) URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java&r1=1082666&r2=1082677&rev=1082677&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Thu Mar 17 20:21:13 2011 @@ -24,9 +24,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager; -import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; - /** * Distribute application-specific large, read-only files efficiently. * Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/DistributedCache.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/DistributedCache.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/package-info.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/package-info.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/package-info.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AuditLogger.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/AuditLogger.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AuditLogger.java:776175-785643 Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java) URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java&r1=1082666&r2=1082677&rev=1082677&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Mar 17 20:21:13 2011 @@ -560,7 +560,7 @@ public class BackupStore { private Writer createSpillFile() throws IOException { Path tmp = - new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_" + new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BackupStore.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BasicTypeSorterBase.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BufferSorter.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BufferSorter.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BufferSorter.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/CleanupQueue.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java:776175-785643 Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Clock.java ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Thu Mar 17 20:21:13 2011 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/Clock.java:713112 +/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java:776175-785643