hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [9/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,417 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.junit.Test;
+
+public class TestRMContainerAllocator {
+  private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
+
+  @Test
+  public void testSimple() throws Exception {
+    FifoScheduler scheduler = createScheduler();
+    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+        scheduler);
+
+    //add resources to scheduler
+    NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240);
+    NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+    NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+    //create the container request
+    ContainerRequestEvent event1 = 
+      createReq(1, 1024, 1, new String[]{"h1"});
+    allocator.sendRequest(event1);
+
+    //send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(2, 1024, 1, new String[]{"h2"});
+    allocator.sendRequest(event2);
+
+    //this tells the scheduler about the requests
+    //as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    //send another request with different resource and priority
+    ContainerRequestEvent event3 = createReq(3, 1024, 1, new String[]{"h3"});
+    allocator.sendRequest(event3);
+
+    //this tells the scheduler about the requests
+    //as nodes are not added, no allocations
+    assigned = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    //update resources in scheduler
+    scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+
+    assigned = allocator.schedule();
+    checkAssignments(
+        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
+  }
+
+  //TODO: Currently Scheduler seems to have bug where it does not work
+  //for Application asking for containers with different capabilities.
+  //@Test
+  public void testResource() throws Exception {
+    FifoScheduler scheduler = createScheduler();
+    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+        scheduler);
+
+    //add resources to scheduler
+    NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240);
+    NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+    NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+    //create the container request
+    ContainerRequestEvent event1 = 
+      createReq(1, 1024, 1, new String[]{"h1"});
+    allocator.sendRequest(event1);
+
+    //send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(2, 2048, 1, new String[]{"h2"});
+    allocator.sendRequest(event2);
+
+    //this tells the scheduler about the requests
+    //as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    //update resources in scheduler
+    scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+    assigned = allocator.schedule();
+    checkAssignments(
+        new ContainerRequestEvent[]{event1, event2}, assigned, false);
+  }
+
+  @Test
+  public void testPriority() throws Exception {
+    FifoScheduler scheduler = createScheduler();
+    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+        scheduler);
+
+    //add resources to scheduler
+    NodeInfo nodeManager1 = addNode(scheduler, "h1", 1024);
+    NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+    NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+    //create the container request
+    ContainerRequestEvent event1 = 
+      createReq(1, 2048, 1, new String[]{"h1", "h2"});
+    allocator.sendRequest(event1);
+
+    //send 1 more request with different priority
+    ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"});
+    allocator.sendRequest(event2);
+
+    //send 1 more request with different priority
+    ContainerRequestEvent event3 = createReq(3, 2048, 3, new String[]{"h3"});
+    allocator.sendRequest(event3);
+
+    //this tells the scheduler about the requests
+    //as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    //update resources in scheduler
+    scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+    scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+    assigned = allocator.schedule();
+    checkAssignments(
+        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
+
+    //validate that no container is assigned to h1 as it doesn't have 2048
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      Assert.assertFalse("Assigned count not correct", 
+          "h1".equals(assig.getContainerManagerAddress()));
+    }
+  }
+
+
+
+  private NodeInfo addNode(FifoScheduler scheduler, 
+      String nodeName, int memory) {
+    NodeID nodeId = new NodeID();
+    nodeId.id = 0;
+    Resource resource = new Resource();
+    resource.memory = memory;
+    NodeInfo nodeManager = scheduler.addNode(nodeId, nodeName,
+        RMResourceTrackerImpl.resolve(nodeName), resource); // Node registration
+    return nodeManager;
+  }
+
+  private FifoScheduler createScheduler() throws AvroRemoteException {
+    FifoScheduler fsc = new FifoScheduler(new Configuration(),
+        new ContainerTokenSecretManager()) {
+      //override this to copy the objects
+      //otherwise FifoScheduler updates the numContainers in same objects as kept by
+      //RMContainerAllocator
+      @Override
+      public synchronized List<Container> allocate(ApplicationID applicationId,
+          List<ResourceRequest> ask, List<Container> release) 
+          throws IOException {
+        List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+        for (ResourceRequest req : ask) {
+          ResourceRequest reqCopy = new ResourceRequest();
+          reqCopy.priority = req.priority;
+          reqCopy.hostName = req.hostName;
+          reqCopy.capability = req.capability;
+          reqCopy.numContainers = req.numContainers;
+          askCopy.add(reqCopy);
+        }
+        //no need to copy release
+        return super.allocate(applicationId, askCopy, release);
+      }
+    };
+    try {
+      fsc.addApplication(new ApplicationID(), "test", null, null);
+    } catch(IOException ie) {
+      LOG.info("add application failed with ", ie);
+      assert(false);
+    }
+    return fsc;
+  }
+
+  private ContainerRequestEvent createReq(
+      int attemptid, int memory, int priority, String[] hosts) {
+    TaskAttemptID attemptId = new TaskAttemptID();
+    attemptId.id = attemptid;
+    Resource containerNeed = new Resource();
+    containerNeed.memory = memory;
+    return new ContainerRequestEvent(attemptId, 
+        containerNeed, priority,
+        hosts, new String[] {NetworkTopology.DEFAULT_RACK});
+  }
+
+  private void checkAssignments(ContainerRequestEvent[] requests, 
+      List<TaskAttemptContainerAssignedEvent> assignments, 
+      boolean checkHostMatch) {
+    Assert.assertNotNull("Container not assigned", assignments);
+    Assert.assertEquals("Assigned count not correct", 
+        requests.length, assignments.size());
+
+    //check for uniqueness of containerIDs
+    Set<ContainerID> containerIds = new HashSet<ContainerID>();
+    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+      containerIds.add(assigned.getContainerID());
+    }
+    Assert.assertEquals("Assigned containers must be different", 
+        assignments.size(), containerIds.size());
+
+    //check for all assignment
+    for (ContainerRequestEvent req : requests) {
+      TaskAttemptContainerAssignedEvent assigned = null;
+      for (TaskAttemptContainerAssignedEvent ass : assignments) {
+        if (ass.getTaskAttemptID().equals(req.getAttemptID())){
+          assigned = ass;
+          break;
+        }
+      }
+      checkAssignment(req, assigned, checkHostMatch);
+    }
+  }
+
+  private void checkAssignment(ContainerRequestEvent request, 
+      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+    Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
+        assigned);
+    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+        assigned.getTaskAttemptID());
+    if (checkHostMatch) {
+      Assert.assertTrue("Not assigned to requested host", Arrays.asList
+          (request.getHosts()).contains(assigned.getContainerManagerAddress()));
+    }
+
+  }
+
+  //Mock RMContainerAllocator
+  //Instead of talking to remote Scheduler,uses the local Scheduler
+  public static class LocalRMContainerAllocator extends RMContainerAllocator {
+    private static final List<TaskAttemptContainerAssignedEvent> events = 
+      new ArrayList<TaskAttemptContainerAssignedEvent>();
+
+    public static class AMRMProtocolImpl implements AMRMProtocol {
+
+      private ResourceScheduler resourceScheduler;
+
+      public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
+        this.resourceScheduler = resourceScheduler;
+      }
+
+      @Override
+      public Void registerApplicationMaster(
+          ApplicationMaster applicationMaster) throws AvroRemoteException {
+        return null;
+      }
+
+      @Override
+      public AMResponse allocate(ApplicationStatus status,
+          List<ResourceRequest> ask, List<Container> release)
+          throws AvroRemoteException {
+        try {
+          AMResponse response = new AMResponse();
+          response.containers = resourceScheduler.allocate(status.applicationId, ask, release);
+          return response;
+        } catch(IOException ie) {
+          throw RPCUtil.getRemoteException(ie);
+        }
+      }
+
+      @Override
+      public Void finishApplicationMaster(ApplicationMaster applicationMaster)
+      throws AvroRemoteException {
+        // TODO Auto-generated method stub
+        return null;
+      }
+
+    }
+
+    private ResourceScheduler scheduler;
+    LocalRMContainerAllocator(ResourceScheduler scheduler) {
+      super(null, new TestContext(events));
+      this.scheduler = scheduler;
+      super.init(new Configuration());
+      super.start();
+    }
+
+    protected AMRMProtocol createSchedulerProxy() {
+      return new AMRMProtocolImpl(scheduler);
+    }
+
+    @Override
+    protected void register() {}
+    @Override
+    protected void unregister() {}
+
+    public void sendRequest(ContainerRequestEvent req) {
+      sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
+    }
+
+    public void sendRequests(List<ContainerRequestEvent> reqs) {
+      for (ContainerRequestEvent req : reqs) {
+        handle(req);
+      }
+    }
+
+    //API to be used by tests
+    public List<TaskAttemptContainerAssignedEvent> schedule() {
+      //run the scheduler
+      try {
+        allocate();
+      } catch (Exception e) {
+
+      }
+
+      List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
+      events.clear();
+      return result;
+    }
+
+    protected void startAllocatorThread() {
+      //override to NOT start thread
+    }
+
+    static class TestContext implements AppContext {
+      private List<TaskAttemptContainerAssignedEvent> events;
+      TestContext(List<TaskAttemptContainerAssignedEvent> events) {
+        this.events = events;
+      }
+      @Override
+      public Map<JobID, Job> getAllJobs() {
+        return null;
+      }
+      @Override
+      public ApplicationID getApplicationID() {
+        return new ApplicationID();
+      }
+      @Override
+      public EventHandler getEventHandler() {
+        return new EventHandler() {
+          @Override
+          public void handle(Event event) {
+            events.add((TaskAttemptContainerAssignedEvent) event);
+          }
+        };
+      }
+      @Override
+      public Job getJob(JobID jobID) {
+        return null;
+      }
+
+      @Override
+      public CharSequence getUser() {
+        return null;
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestRMContainerAllocator t = new TestRMContainerAllocator();
+    t.testSimple();
+    //t.testResource();
+    t.testPriority();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,757 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.Clock;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRuntimeEstimators {
+
+  private static int INITIAL_NUMBER_FREE_SLOTS = 300;
+  private static int MAP_SLOT_REQUIREMENT = 3;
+  // this has to be at least as much as map slot requirement
+  private static int REDUCE_SLOT_REQUIREMENT = 4;
+  private static int MAP_TASKS = 200;
+  private static int REDUCE_TASKS = 150;
+
+  private Queue<TaskEvent> taskEvents;
+
+  Clock clock;
+
+  Job myJob;
+
+  AppContext myAppContext;
+
+  private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
+
+  private final AtomicInteger slotsInUse = new AtomicInteger(0);
+
+  Dispatcher dispatcher;
+
+  DefaultSpeculator speculator;
+
+  TaskRuntimeEstimator estimator;
+
+  // This is a huge kluge.  The real implementations have a decent approach
+  private final AtomicInteger completedMaps = new AtomicInteger(0);
+  private final AtomicInteger completedReduces = new AtomicInteger(0);
+
+  private final AtomicInteger successfulSpeculations
+      = new AtomicInteger(0);
+  private final AtomicLong taskTimeSavedBySpeculation
+      = new AtomicLong(0L);
+
+  private void coreTestEstimator
+      (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
+    estimator = testedEstimator;
+    taskEvents = new ConcurrentLinkedQueue<TaskEvent>();
+    myJob = null;
+    slotsInUse.set(0);
+    completedMaps.set(0);
+    completedReduces.set(0);
+    successfulSpeculations.set(0);
+    taskTimeSavedBySpeculation.set(0);
+
+    ((MockClock)clock).advanceTime(1000);
+
+    Configuration conf = new Configuration();
+
+    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+    myJob = myAppContext.getAllJobs().values().iterator().next();
+
+    estimator.contextualize(conf, myAppContext);
+
+    speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
+
+    dispatcher.register(Speculator.EventType.class, speculator);
+
+    dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
+
+    ((AsyncDispatcher)dispatcher).init(conf);
+    ((AsyncDispatcher)dispatcher).start();
+
+
+
+    speculator.init(conf);
+    speculator.start();
+
+    // Now that the plumbing is hooked up, we do the following:
+    //  do until all tasks are finished, ...
+    //  1: If we have spare capacity, assign as many map tasks as we can, then
+    //     assign as many reduce tasks as we can.  Note that an odd reduce
+    //     task might be started while there are still map tasks, because
+    //     map tasks take 3 slots and reduce tasks 2 slots.
+    //  2: Send a speculation event for every task attempt that's running
+    //  note that new attempts might get started by the speculator
+
+    // discover undone tasks
+    int undoneMaps = MAP_TASKS;
+    int undoneReduces = REDUCE_TASKS;
+
+    // build a task sequence where all the maps precede any of the reduces
+    List<Task> allTasksSequence = new LinkedList<Task>();
+
+    allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
+    allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
+
+    while (undoneMaps + undoneReduces > 0) {
+      undoneMaps = 0; undoneReduces = 0;
+      // start all attempts which are new but for which there is enough slots
+      for (Task task : allTasksSequence) {
+        if (!task.isFinished()) {
+          if (task.getType() == TaskType.MAP) {
+            ++undoneMaps;
+          } else {
+            ++undoneReduces;
+          }
+        }
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          if (attempt.getState() == TaskAttemptState.NEW
+              && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
+                    >= taskTypeSlots(task.getType())) {
+            MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
+            SpeculatorEvent event
+                = new SpeculatorEvent(attempt.getID(), false, clock.getTime());
+            speculator.handle(event);
+            attemptImpl.startUp();
+          } else {
+            // If a task attempt is in progress we should send the news to
+            // the Speculator.
+            TaskAttemptStatus status = new TaskAttemptStatus();
+            status.id = attempt.getID();
+            status.progress = attempt.getProgress();
+            status.stateString = attempt.getState().name();
+            SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
+            speculator.handle(event);
+          }
+        }
+      }
+
+      long startTime = System.currentTimeMillis();
+
+      // drain the speculator event queue
+      while (!speculator.eventQueueEmpty()) {
+        Thread.yield();
+        if (System.currentTimeMillis() > startTime + 130000) {
+          return;
+        }
+      }
+
+      ((MockClock) clock).advanceTime(1000L);
+
+      if (clock.getTime() % 10000L == 0L) {
+        speculator.scanForSpeculations();
+      }
+    }
+
+    Assert.assertEquals("We got the wrong number of successful speculations.",
+        expectedSpeculations, successfulSpeculations.get());
+  }
+
+  @Test
+  public void testLegacyEstimator() throws Exception {
+    clock = new MockClock();
+    TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
+    Configuration conf = new Configuration();
+    dispatcher = new AsyncDispatcher();
+    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+
+    coreTestEstimator(specificEstimator, 3);
+  }
+
+  @Test
+  public void testExponentialEstimator() throws Exception {
+    clock = new MockClock();
+    TaskRuntimeEstimator specificEstimator
+        = new ExponentiallySmoothedTaskRuntimeEstimator();
+    Configuration conf = new Configuration();
+    dispatcher = new AsyncDispatcher();
+    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+
+    coreTestEstimator(new LegacyTaskRuntimeEstimator(), 3);
+  }
+
+  int taskTypeSlots(TaskType type) {
+    return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
+  }
+
+  private boolean jobComplete() {
+    for (Task task : myJob.getTasks().values()) {
+      if (!task.isFinished()) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private int slotsInUse(int mapSize, int reduceSize) {
+    return slotsInUse.get();
+  }
+
+  class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
+
+    @Override
+    public void handle(TaskEvent event) {
+      TaskID taskID = event.getTaskID();
+      Task task = myJob.getTask(taskID);
+
+      Assert.assertEquals
+          ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
+
+      System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
+
+      addAttempt(task);
+    }
+  }
+
+  void addAttempt(Task task) {
+    MyTaskImpl myTask = (MyTaskImpl) task;
+
+    myTask.addAttempt();
+  }
+
+  class MyTaskImpl implements Task {
+    private final TaskID taskID;
+    private final Map<TaskAttemptID, TaskAttempt> attempts
+        = new HashMap<TaskAttemptID, TaskAttempt>(4);
+
+    MyTaskImpl(JobID jobID, int index, TaskType type) {
+      taskID = new TaskID();
+      taskID.id = index;
+      taskID.taskType = type;
+      taskID.jobID = jobID;
+    }
+
+    void addAttempt() {
+      TaskAttempt taskAttempt
+          = new MyTaskAttemptImpl(taskID, attempts.size(), clock);
+      TaskAttemptID taskAttemptID = taskAttempt.getID();
+
+      attempts.put(taskAttemptID, taskAttempt);
+
+      System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public TaskID getID() {
+      return taskID;
+    }
+
+    @Override
+    public TaskReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public float getProgress() {
+      float result = 0.0F;
+
+
+     for (TaskAttempt attempt : attempts.values()) {
+       result = Math.max(result, attempt.getProgress());
+     }
+
+     return result;
+    }
+
+    @Override
+    public TaskType getType() {
+      return taskID.taskType;
+    }
+
+    @Override
+    public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+      Map<TaskAttemptID, TaskAttempt> result
+          = new HashMap<TaskAttemptID, TaskAttempt>(attempts.size());
+      result.putAll(attempts);
+      return result;
+    }
+
+    @Override
+    public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+      return attempts.get(attemptID);
+    }
+
+    @Override
+    public boolean isFinished() {
+      for (TaskAttempt attempt : attempts.values()) {
+        if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    @Override
+    public boolean canCommit(TaskAttemptID taskAttemptID) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public TaskState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+  }
+
+  class MyJobImpl implements Job {
+    private final JobID jobID;
+    private final Map<TaskID, Task> allTasks = new HashMap<TaskID, Task>();
+    private final Map<TaskID, Task> mapTasks = new HashMap<TaskID, Task>();
+    private final Map<TaskID, Task> reduceTasks = new HashMap<TaskID, Task>();
+
+    MyJobImpl(JobID jobID, int numMaps, int numReduces) {
+      this.jobID = jobID;
+      for (int i = 0; i < numMaps; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
+        mapTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+      for (int i = 0; i < numReduces; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
+        reduceTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+
+      // give every task an attempt
+      for (Task task : allTasks.values()) {
+        MyTaskImpl myTaskImpl = (MyTaskImpl) task;
+        myTaskImpl.addAttempt();
+      }
+    }
+
+    @Override
+    public JobID getID() {
+      return jobID;
+    }
+
+    @Override
+    public JobState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public JobReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Map<TaskID, Task> getTasks() {
+      return allTasks;
+    }
+
+    @Override
+    public Map<TaskID, Task> getTasks(TaskType taskType) {
+      return taskType == TaskType.MAP ? mapTasks : reduceTasks;
+    }
+
+    @Override
+    public Task getTask(TaskID taskID) {
+      return allTasks.get(taskID);
+    }
+
+    @Override
+    public List<String> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getCompletedMaps() {
+      return completedMaps.get();
+    }
+
+    @Override
+    public int getCompletedReduces() {
+      return completedReduces.get();
+    }
+
+    @Override
+    public TaskAttemptCompletionEvent[]
+            getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public CharSequence getName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getTotalMaps() {
+      return mapTasks.size();
+    }
+
+    @Override
+    public int getTotalReduces() {
+      return reduceTasks.size();
+    }
+  }
+
+  /*
+   * We follow the pattern of the real XxxImpl .  We create a job and initialize
+   * it with a full suite of tasks which in turn have one attempt each in the
+   * NEW state.  Attempts transition only from NEW to RUNNING to SUCCEEDED .
+   */
+  class MyTaskAttemptImpl implements TaskAttempt {
+    private final TaskAttemptID myAttemptID;
+
+    long startMockTime = Long.MIN_VALUE;
+
+    long shuffleCompletedTime = Long.MAX_VALUE;
+
+    TaskAttemptState overridingState = TaskAttemptState.NEW;
+
+    MyTaskAttemptImpl(TaskID taskID, int index, Clock clock) {
+      myAttemptID = new TaskAttemptID();
+      myAttemptID.id = index;
+      myAttemptID.taskID = taskID;
+    }
+
+    void startUp() {
+      startMockTime = clock.getTime();
+      overridingState = null;
+
+      slotsInUse.addAndGet(taskTypeSlots(myAttemptID.taskID.taskType));
+
+      System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(getID().taskID, -1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public TaskAttemptID getID() {
+      return myAttemptID;
+    }
+
+    @Override
+    public TaskAttemptReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public List<CharSequence> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    private float getCodeRuntime() {
+      int taskIndex = myAttemptID.taskID.id;
+      int attemptIndex = myAttemptID.id;
+
+      float result = 200.0F;
+
+      switch (taskIndex % 4) {
+        case 0:
+          if (taskIndex % 40 == 0 && attemptIndex == 0) {
+            result = 600.0F;
+            break;
+          }
+
+          break;
+        case 2:
+          break;
+
+        case 1:
+          result = 150.0F;
+          break;
+
+        case 3:
+          result = 250.0F;
+          break;
+      }
+
+      return result;
+    }
+
+    private float getMapProgress() {
+      float runtime = getCodeRuntime();
+
+      return Math.min
+          ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
+    }
+
+    private float getReduceProgress() {
+      Job job = myAppContext.getJob(myAttemptID.taskID.jobID);
+      float runtime = getCodeRuntime();
+
+      Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
+
+      int numberMaps = allMapTasks.size();
+      int numberDoneMaps = 0;
+
+      for (Task mapTask : allMapTasks) {
+        if (mapTask.isFinished()) {
+          ++numberDoneMaps;
+        }
+      }
+
+      if (numberMaps == numberDoneMaps) {
+        shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
+
+        return Math.min
+            ((float) (clock.getTime() - shuffleCompletedTime)
+                        / (runtime * 2000.0F) + 0.5F,
+             1.0F);
+      } else {
+        return ((float) numberDoneMaps) / numberMaps * 0.5F;
+      }
+    }
+
+    // we compute progress from time and an algorithm now
+    @Override
+    public float getProgress() {
+      if (overridingState == TaskAttemptState.NEW) {
+        return 0.0F;
+      }
+      return myAttemptID.taskID.taskType == TaskType.MAP ? getMapProgress() : getReduceProgress();
+    }
+
+    @Override
+    public TaskAttemptState getState() {
+      if (overridingState != null) {
+        return overridingState;
+      }
+      TaskAttemptState result
+          = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
+
+      if (result == TaskAttemptState.SUCCEEDED) {
+        overridingState = TaskAttemptState.SUCCEEDED;
+
+        System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
+
+        slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType));
+
+        (myAttemptID.taskID.taskType == TaskType.MAP
+            ? completedMaps : completedReduces).getAndIncrement();
+
+        // check for a spectacularly successful speculation
+        TaskID taskID = myAttemptID.taskID;
+        Task undoneTask = null;
+
+        Task task = myJob.getTask(taskID);
+
+        for (TaskAttempt otherAttempt : task.getAttempts().values()) {
+          if (otherAttempt != this
+              && otherAttempt.getState() == TaskAttemptState.RUNNING) {
+            // we had two instances running.  Try to determine how much
+            //  we might have saved by speculation
+            if (getID().id > otherAttempt.getID().id) {
+              // the speculation won
+              successfulSpeculations.getAndIncrement();
+              float hisProgress = otherAttempt.getProgress();
+              long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
+              System.out.println("TLTRE:A speculation finished at time "
+                  + clock.getTime()
+                  + ".  The stalled attempt is at " + (hisProgress * 100.0)
+                  + "% progress, and it started at "
+                  + hisStartTime + ", which is "
+                  + (clock.getTime() - hisStartTime) + " ago.");
+              long originalTaskEndEstimate
+                  = (hisStartTime
+                      + estimator.estimatedRuntime(otherAttempt.getID()));
+              System.out.println(
+                  "TLTRE: We would have expected the original attempt to take "
+                  + estimator.estimatedRuntime(otherAttempt.getID())
+                  + ", finishing at " + originalTaskEndEstimate);
+              long estimatedSavings = originalTaskEndEstimate - clock.getTime();
+              taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
+              System.out.println("TLTRE: The task is " + task.getID());
+              slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType));
+              ((MyTaskAttemptImpl)otherAttempt).overridingState
+                  = TaskAttemptState.KILLED;
+            } else {
+              System.out.println(
+                  "TLTRE: The normal attempt beat the speculation in "
+                  + task.getID());
+            }
+          }
+        }
+      }
+
+      return result;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return getProgress() == 1.0F;
+    }
+
+    @Override
+    public ContainerID getAssignedContainerID() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getAssignedContainerMgrAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getLaunchTime() {
+      return startMockTime;
+    }
+
+    @Override
+    public long getFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  static class MockClock extends Clock {
+    private long currentTime = 0;
+
+    @Override
+    long getMeasuredTime() {
+      return currentTime;
+    }
+
+    void setMeasuredTime(long newTime) {
+      currentTime = newTime;
+    }
+
+    void advanceTime(long increment) {
+      currentTime += increment;
+    }
+  }
+
+  class MyAppMaster extends CompositeService {
+    final Clock clock;
+      public MyAppMaster(Clock clock) {
+        super(MyAppMaster.class.getName());
+        if (clock == null) {
+          clock = new Clock();
+        }
+      this.clock = clock;
+      LOG.info("Created MyAppMaster");
+    }
+  }
+
+  class MyAppContext implements AppContext {
+    // I'll be making Avro objects by hand.  Please don't do that very often.
+
+    private final ApplicationID myApplicationID;
+    private final JobID myJobID;
+    private final Map<JobID, Job> allJobs;
+
+    MyAppContext(int numberMaps, int numberReduces) {
+      myApplicationID = new ApplicationID();
+      myApplicationID.clusterTimeStamp = clock.getTime();
+      myApplicationID.id = 1;
+
+      myJobID = new JobID();
+      myJobID.appID = myApplicationID;
+
+      Job myJob
+          = new MyJobImpl(myJobID, numberMaps, numberReduces);
+
+      allJobs = Collections.singletonMap(myJobID, myJob);
+    }
+
+    @Override
+    public ApplicationID getApplicationID() {
+      return myApplicationID;
+    }
+
+    @Override
+    public Job getJob(JobID jobID) {
+      return allJobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobID, Job> getAllJobs() {
+      return allJobs;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.webapp;
+
+import com.google.inject.Injector;
+import java.util.Map;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MockJobs;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AppController;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AppView;
+import org.apache.hadoop.mapreduce.v2.app.webapp.JobPage;
+import org.apache.hadoop.mapreduce.v2.app.webapp.TaskPage;
+import org.apache.hadoop.mapreduce.v2.app.webapp.TasksPage;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.test.WebAppTests;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
+
+public class TestAMWebApp {
+
+  static class TestAppContext implements AppContext {
+    final ApplicationID appID;
+    final String user = MockJobs.newUserName();
+    final Map<JobID, Job> jobs;
+
+    TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
+      appID = MockJobs.newAppID(appid);
+      jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
+    }
+
+    TestAppContext() {
+      this(0, 1, 1, 1);
+    }
+
+    @Override
+    public ApplicationID getApplicationID() {
+      return appID;
+    }
+
+    @Override
+    public CharSequence getUser() {
+      return user;
+    }
+
+    @Override
+    public Job getJob(JobID jobID) {
+      return jobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobID, Job> getAllJobs() {
+      return jobs; // OK
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return null;
+    }
+  }
+
+  @Test public void testAppControllerIndex() {
+    TestAppContext ctx = new TestAppContext();
+    Injector injector = WebAppTests.createMockInjector(AppContext.class, ctx);
+    AppController controller = injector.getInstance(AppController.class);
+    controller.index();
+    assertEquals(Apps.toString(ctx.appID), controller.get(APP_ID,""));
+  }
+
+  @Test public void testAppView() {
+    WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext());
+  }
+
+  @Test public void testJobView() {
+    WebAppTests.testPage(JobPage.class, AppContext.class, new TestAppContext());
+  }
+
+  @Test public void testTasksView() {
+    WebAppTests.testPage(TasksPage.class, AppContext.class,
+                         new TestAppContext());
+  }
+
+  @Test public void testTaskView() {
+    WebAppTests.testPage(TaskPage.class, AppContext.class,
+                         new TestAppContext());
+  }
+
+  public static void main(String[] args) {
+    WebApps.$for("yarn", AppContext.class, new TestAppContext(0, 8, 88, 4)).
+        at(58888).inDevMode().start(new AMWebApp()).joinThread();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,63 @@
+<?xml version="1.0"?><project>
+  <parent>
+    <artifactId>hadoop-mapreduce-client</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-mapreduce-client-common</artifactId>
+  <name>hadoop-mapreduce-client-common</name>
+  <version>${yarn.version}</version>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <!-- begin MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn</artifactId>
+      <version>${yarn.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-api</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server</artifactId>
+      <version>${yarn.version}</version>
+      <type>pom</type>
+    </dependency>
+    <!-- end MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro Thu Mar 17 20:21:13 2011
@@ -0,0 +1,151 @@
+@namespace("org.apache.hadoop.mapreduce.v2.api")
+protocol MRClientProtocol {
+
+  import idl "./yarn/yarn-api/src/main/avro/yarn-types.genavro";
+
+  enum TaskType {
+    MAP,
+    REDUCE
+  }
+
+  record JobID {
+    org.apache.hadoop.yarn.ApplicationID appID;
+    int id;
+  }
+
+  record TaskID {
+    JobID jobID;
+    TaskType taskType;
+    int id;
+  }
+
+  record TaskAttemptID {
+    TaskID taskID; 
+    int id;
+  }
+
+  enum TaskState {
+    NEW,
+    SCHEDULED,
+    RUNNING,
+    SUCCEEDED,
+    FAILED,
+    KILL_WAIT,
+    KILLED
+  }
+
+  enum Phase {
+    STARTING,
+    MAP,
+    SHUFFLE,
+    SORT,
+    REDUCE,
+    CLEANUP
+  }
+
+  record Counter {
+    string name;
+    string displayName;
+    long value; 
+  }
+
+  record CounterGroup {
+    string name;
+    string displayname;
+    map<Counter> counters;
+  }
+
+  record Counters {
+    map<CounterGroup> groups;
+  }
+
+  record TaskReport {
+    TaskID id;
+    TaskState state;
+    float progress;
+    long startTime;
+    long finishTime;
+    Counters counters;
+    array<TaskAttemptID> runningAttempts;
+    union{TaskAttemptID, null} successfulAttempt;
+    array<string> diagnostics;
+  }
+
+  enum TaskAttemptState {
+    NEW,
+    UNASSIGNED,
+    ASSIGNED,
+    RUNNING,
+    COMMIT_PENDING,
+    SUCCESS_CONTAINER_CLEANUP,
+    SUCCEEDED,
+    FAIL_CONTAINER_CLEANUP,
+    FAIL_TASK_CLEANUP,
+    FAILED,
+    KILL_CONTAINER_CLEANUP,
+    KILL_TASK_CLEANUP,
+    KILLED
+  }
+
+  record TaskAttemptReport {
+    TaskAttemptID id;
+    TaskAttemptState state;
+    float progress;
+    long startTime;
+    long finishTime;
+    Counters counters;
+    string diagnosticInfo;
+    string stateString;
+    Phase phase;
+  }
+
+  enum JobState {
+    NEW,
+    RUNNING,
+    SUCCEEDED,
+    FAILED,
+    KILL_WAIT,
+    KILLED,
+    ERROR
+  }
+
+  record JobReport {
+    JobID id;
+    JobState state;
+    float mapProgress;
+    float reduceProgress;
+    float cleanupProgress;
+    float setupProgress;
+    long startTime;
+    long finishTime;
+  }
+
+  enum TaskAttemptCompletionEventStatus {
+    FAILED,
+    KILLED,
+    SUCCEEDED,
+    OBSOLETE,
+    TIPFAILED
+  }
+
+  record TaskAttemptCompletionEvent {
+    TaskAttemptID attemptId;
+    TaskAttemptCompletionEventStatus status;
+    string mapOutputServerAddress;
+    int attemptRunTime;
+    int eventId;
+  }
+
+  JobReport getJobReport(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  TaskReport getTaskReport(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  Counters getCounters(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  array<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID, int fromEventId, int maxEvents) throws org.apache.hadoop.yarn.YarnRemoteException;
+  array<TaskReport> getTaskReports(JobID jobID, TaskType taskType) throws org.apache.hadoop.yarn.YarnRemoteException;
+  array<string> getDiagnostics(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+
+  void killJob(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  void killTask(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  void killTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class YarnMRJobConfig {
+  public static final String SPECULATOR_CLASS
+      = "yarn.mapreduce.job.speculator.class";
+  public static final String TASK_RUNTIME_ESTIMATOR_CLASS
+      = "yarn.mapreduce.job.task.runtime.estimator.class";
+  public static final String TASK_ATTEMPT_PROGRESS_RUNTIME_LINEARIZER_CLASS
+      = "yarn.mapreduce.job.task.runtime.linearizer.class";
+  public static final String EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS
+      = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
+  public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
+      = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,308 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.lib;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.Counter;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class TypeConverter {
+
+  public static org.apache.hadoop.mapred.JobID fromYarn(JobID id) {
+    String identifier = fromClusterTimeStamp(id.appID.clusterTimeStamp);
+    return new org.apache.hadoop.mapred.JobID(identifier, id.id);
+  }
+
+  //currently there is 1-1 mapping between appid and jobid
+  public static org.apache.hadoop.mapreduce.JobID fromYarn(ApplicationID appID) {
+    String identifier = fromClusterTimeStamp(appID.clusterTimeStamp);
+    return new org.apache.hadoop.mapred.JobID(identifier, appID.id);
+  }
+
+  public static JobID toYarn(org.apache.hadoop.mapreduce.JobID id) {
+    JobID jobID = new JobID();
+    jobID.id = id.getId(); //currently there is 1-1 mapping between appid and jobid
+    jobID.appID = new ApplicationID();
+    jobID.appID.id = id.getId();
+    jobID.appID.clusterTimeStamp = toClusterTimeStamp(id.getJtIdentifier());
+    return jobID;
+  }
+
+  private static String fromClusterTimeStamp(long clusterTimeStamp) {
+    return Long.toString(clusterTimeStamp);
+  }
+
+  private static long toClusterTimeStamp(String identifier) {
+    return Long.parseLong(identifier);
+  }
+
+  public static org.apache.hadoop.mapreduce.TaskType fromYarn(
+      TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return org.apache.hadoop.mapreduce.TaskType.MAP;
+    case REDUCE:
+      return org.apache.hadoop.mapreduce.TaskType.REDUCE;
+    default:
+      throw new YarnException("Unrecognized task type: " + taskType);
+    }
+  }
+
+  public static TaskType
+      toYarn(org.apache.hadoop.mapreduce.TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return TaskType.MAP;
+    case REDUCE:
+      return TaskType.REDUCE;
+    default:
+      throw new YarnException("Unrecognized task type: " + taskType);
+    }
+  }
+
+  public static org.apache.hadoop.mapred.TaskID fromYarn(TaskID id) {
+    return new org.apache.hadoop.mapred.TaskID(fromYarn(id.jobID), fromYarn(id.taskType),
+        id.id);
+  }
+
+  public static TaskID toYarn(org.apache.hadoop.mapreduce.TaskID id) {
+    TaskID taskID = new TaskID();
+    taskID.id = id.getId();
+    taskID.taskType = toYarn(id.getTaskType());
+    taskID.jobID = toYarn(id.getJobID());
+    return taskID;
+  }
+
+  public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
+    switch (phase) {
+    case STARTING:
+      return Phase.STARTING;
+    case MAP:
+      return Phase.MAP;
+    case SHUFFLE:
+      return Phase.SHUFFLE;
+    case SORT:
+      return Phase.SORT;
+    case REDUCE:
+      return Phase.REDUCE;
+    case CLEANUP:
+      return Phase.CLEANUP;
+    }
+    throw new YarnException("Unrecognized Phase: " + phase);
+  }
+
+  public static TaskCompletionEvent[] fromYarn(
+      org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] newEvents) {
+    TaskCompletionEvent[] oldEvents =
+        new TaskCompletionEvent[newEvents.length];
+    int i = 0;
+    for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent 
+        : newEvents) {
+      oldEvents[i++] = fromYarn(newEvent);
+    }
+    return oldEvents;
+  }
+
+  public static TaskCompletionEvent fromYarn(
+      org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent) {
+    return new TaskCompletionEvent(newEvent.eventId,
+              fromYarn(newEvent.attemptId), newEvent.attemptId.id,
+              newEvent.attemptId.taskID.taskType.equals(TaskType.MAP),
+              fromYarn(newEvent.status),
+              newEvent.mapOutputServerAddress.toString());
+  }
+
+  public static TaskCompletionEvent.Status fromYarn(
+      TaskAttemptCompletionEventStatus newStatus) {
+    switch (newStatus) {
+    case FAILED:
+      return TaskCompletionEvent.Status.FAILED;
+    case KILLED:
+      return TaskCompletionEvent.Status.KILLED;
+    case OBSOLETE:
+      return TaskCompletionEvent.Status.OBSOLETE;
+    case SUCCEEDED:
+      return TaskCompletionEvent.Status.SUCCEEDED;
+    case TIPFAILED:
+      return TaskCompletionEvent.Status.TIPFAILED;
+    }
+    throw new YarnException("Unrecognized status: " + newStatus);
+  }
+
+  public static org.apache.hadoop.mapred.TaskAttemptID fromYarn(
+      TaskAttemptID id) {
+    return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.taskID),
+        id.id);
+  }
+
+  public static TaskAttemptID toYarn(
+      org.apache.hadoop.mapred.TaskAttemptID id) {
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    taskAttemptID.taskID = toYarn(id.getTaskID());
+    taskAttemptID.id = id.getId();
+    return taskAttemptID;
+  }
+
+  public static TaskAttemptID toYarn(
+      org.apache.hadoop.mapreduce.TaskAttemptID id) {
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    taskAttemptID.taskID = toYarn(id.getTaskID());
+    taskAttemptID.id = id.getId();
+    return taskAttemptID;
+  }
+  
+  public static org.apache.hadoop.mapreduce.Counters fromYarn(
+      Counters yCntrs) {
+    org.apache.hadoop.mapreduce.Counters counters = 
+      new org.apache.hadoop.mapreduce.Counters();
+    for (CounterGroup yGrp : yCntrs.groups.values()) {
+      for (Counter yCntr : yGrp.counters.values()) {
+        org.apache.hadoop.mapreduce.Counter c = 
+          counters.findCounter(yGrp.displayname.toString(), 
+              yCntr.displayName.toString());
+        c.setValue(yCntr.value);
+      }
+    }
+    return counters;
+  }
+
+  public static Counters toYarn(org.apache.hadoop.mapred.Counters counters) {
+    Counters yCntrs = new Counters();
+    yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+    for (org.apache.hadoop.mapred.Counters.Group grp : counters) {
+      CounterGroup yGrp = new CounterGroup();
+      yGrp.name = grp.getName();
+      yGrp.displayname = grp.getDisplayName();
+      yGrp.counters = new HashMap<CharSequence, Counter>();
+      for (org.apache.hadoop.mapred.Counters.Counter cntr : grp) {
+        Counter yCntr = new Counter();
+        yCntr.name = cntr.getName();
+        yCntr.displayName = cntr.getDisplayName();
+        yCntr.value = cntr.getValue();
+        yGrp.counters.put(yCntr.name, yCntr);
+      }
+      yCntrs.groups.put(yGrp.name, yGrp);
+    }
+    return yCntrs;
+  }
+
+  public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
+    Counters yCntrs = new Counters();
+    yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+    for (org.apache.hadoop.mapreduce.CounterGroup grp : counters) {
+      CounterGroup yGrp = new CounterGroup();
+      yGrp.name = grp.getName();
+      yGrp.displayname = grp.getDisplayName();
+      yGrp.counters = new HashMap<CharSequence, Counter>();
+      for (org.apache.hadoop.mapreduce.Counter cntr : grp) {
+        Counter yCntr = new Counter();
+        yCntr.name = cntr.getName();
+        yCntr.displayName = cntr.getDisplayName();
+        yCntr.value = cntr.getValue();
+        yGrp.counters.put(yCntr.name, yCntr);
+      }
+      yCntrs.groups.put(yGrp.name, yGrp);
+    }
+    return yCntrs;
+  }
+  
+  public static org.apache.hadoop.mapred.JobStatus fromYarn(
+      JobReport jobreport, String jobFile, String trackingUrl) {
+    String user = null,  jobName = null;
+    JobPriority jobPriority = JobPriority.NORMAL;
+    return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.id),
+        jobreport.setupProgress, jobreport.mapProgress,
+        jobreport.reduceProgress, jobreport.cleanupProgress,
+        fromYarn(jobreport.state),
+        jobPriority, user, jobName, jobFile, trackingUrl);
+  }
+  
+  public static int fromYarn(JobState state) {
+    switch (state) {
+    case NEW:
+      return org.apache.hadoop.mapred.JobStatus.PREP;
+    case RUNNING:
+      return org.apache.hadoop.mapred.JobStatus.RUNNING;
+    case KILL_WAIT:
+    case KILLED:
+      return org.apache.hadoop.mapred.JobStatus.KILLED;
+    case SUCCEEDED:
+      return org.apache.hadoop.mapred.JobStatus.SUCCEEDED;
+    case FAILED:
+    case ERROR:
+      return org.apache.hadoop.mapred.JobStatus.FAILED;
+    }
+    throw new YarnException("Unrecognized job state: " + state);
+  }
+
+  public static org.apache.hadoop.mapred.TIPStatus fromYarn(
+      TaskState state) {
+    switch (state) {
+    case NEW:
+    case SCHEDULED:
+      return org.apache.hadoop.mapred.TIPStatus.PENDING;
+    case RUNNING:
+      return org.apache.hadoop.mapred.TIPStatus.RUNNING;
+    case KILL_WAIT:
+    case KILLED:
+      return org.apache.hadoop.mapred.TIPStatus.KILLED;
+    case SUCCEEDED:
+      return org.apache.hadoop.mapred.TIPStatus.COMPLETE;
+    case FAILED:
+      return org.apache.hadoop.mapred.TIPStatus.FAILED;
+    }
+    throw new YarnException("Unrecognized task state: " + state);
+  }
+  
+  public static TaskReport fromYarn(org.apache.hadoop.mapreduce.v2.api.TaskReport report) {
+      return new TaskReport(fromYarn(report.id), report.progress, report.state.toString(),
+      (String[]) report.diagnostics.toArray(), fromYarn(report.state), report.startTime, report.finishTime,
+      fromYarn(report.counters));
+  }
+  
+  public static List<TaskReport> fromYarn(
+      List<org.apache.hadoop.mapreduce.v2.api.TaskReport> taskReports) {
+    List<TaskReport> reports = new ArrayList<TaskReport>();
+    for (org.apache.hadoop.mapreduce.v2.api.TaskReport r : taskReports) {
+      reports.add(fromYarn(r));
+    }
+    return reports;
+  }
+}
+

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,165 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+
+/**
+ * Helper class for MR applications
+ */
+public class MRApps extends Apps {
+  public static final String JOB = "job";
+  public static final String TASK = "task";
+  public static final String ATTEMPT = "attempt";
+
+  public static String toString(JobID jid) {
+    return _join(JOB, jid.appID.clusterTimeStamp, jid.appID.id, jid.id);
+  }
+
+  public static JobID toJobID(String jid) {
+    Iterator<String> it = _split(jid).iterator();
+    return toJobID(JOB, jid, it);
+  }
+
+  // mostly useful for parsing task/attempt id like strings
+  public static JobID toJobID(String prefix, String s, Iterator<String> it) {
+    ApplicationID appID = toAppID(prefix, s, it);
+    shouldHaveNext(prefix, s, it);
+    JobID jobID = new JobID();
+    jobID.appID = appID;
+    jobID.id = Integer.parseInt(it.next());
+    return jobID;
+  }
+
+  public static String toString(TaskID tid) {
+    return _join("task", tid.jobID.appID.clusterTimeStamp, tid.jobID.appID.id,
+                 tid.jobID.id, taskSymbol(tid.taskType), tid.id);
+  }
+
+  public static TaskID toTaskID(String tid) {
+    Iterator<String> it = _split(tid).iterator();
+    return toTaskID(TASK, tid, it);
+  }
+
+  public static TaskID toTaskID(String prefix, String s, Iterator<String> it) {
+    JobID jid = toJobID(prefix, s, it);
+    shouldHaveNext(prefix, s, it);
+    TaskID tid = new TaskID();
+    tid.jobID = jid;
+    tid.taskType = taskType(it.next());
+    shouldHaveNext(prefix, s, it);
+    tid.id = Integer.parseInt(it.next());
+    return tid;
+  }
+
+  public static String toString(TaskAttemptID taid) {
+    return _join("attempt", taid.taskID.jobID.appID.clusterTimeStamp,
+                 taid.taskID.jobID.appID.id, taid.taskID.jobID.id,
+                 taskSymbol(taid.taskID.taskType), taid.taskID.id, taid.id);
+  }
+
+  public static TaskAttemptID toTaskAttemptID(String taid) {
+    Iterator<String> it = _split(taid).iterator();
+    TaskID tid = toTaskID(ATTEMPT, taid, it);
+    shouldHaveNext(ATTEMPT, taid, it);
+    TaskAttemptID taID = new TaskAttemptID();
+    taID.taskID = tid;
+    taID.id = Integer.parseInt(it.next());
+    return taID;
+  }
+
+  public static String taskSymbol(TaskType type) {
+    switch (type) {
+      case MAP:           return "m";
+      case REDUCE:        return "r";
+    }
+    throw new YarnException("Unknown task type: "+ type.toString());
+  }
+
+  public static TaskType taskType(String symbol) {
+    // JDK 7 supports switch on strings
+    if (symbol.equals("m")) return TaskType.MAP;
+    if (symbol.equals("r")) return TaskType.REDUCE;
+    throw new YarnException("Unknown task symbol: "+ symbol);
+  }
+
+  public static void setInitialClasspath(
+      Map<CharSequence, CharSequence> environment) throws IOException {
+
+    // Get yarn mapreduce-app classpath from generated classpath
+    // Works if compile time env is same as runtime. For e.g. tests.
+    InputStream classpathFileStream =
+        Thread.currentThread().getContextClassLoader()
+            .getResourceAsStream("mrapp-generated-classpath");
+    BufferedReader reader =
+        new BufferedReader(new InputStreamReader(classpathFileStream));
+    addToClassPath(environment, reader.readLine().trim());
+
+    // If runtime env is different.
+    if (System.getenv().get("YARN_HOME") != null) {
+      ShellCommandExecutor exec =
+         new ShellCommandExecutor(new String[] {
+              System.getenv().get("YARN_HOME") + "/bin/yarn",
+              "classpath" });
+      exec.execute();
+      addToClassPath(environment, exec.getOutput().trim());
+    }
+
+    // Get yarn mapreduce-app classpath
+    if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) {
+      ShellCommandExecutor exec =
+          new ShellCommandExecutor(new String[] {
+              System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred",
+              "classpath" });
+      exec.execute();
+      addToClassPath(environment, exec.getOutput().trim());
+    }
+
+    // TODO: Remove duplicates.
+  }
+
+  public static void addToClassPath(
+      Map<CharSequence, CharSequence> environment, String fileName) {
+    CharSequence classpath = environment.get(CLASSPATH);
+    if (classpath == null) {
+      classpath = fileName;
+    } else {
+      classpath = classpath + ":" + fileName;
+    }
+    environment.put(CLASSPATH, classpath);
+  }
+
+  public static final String CLASSPATH = "CLASSPATH";
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,102 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestMRApps {
+
+  @Test public void testJobIDtoString() {
+    JobID jid = new JobID();
+    jid.appID = new ApplicationID();
+    assertEquals("job_0_0_0", MRApps.toString(jid));
+  }
+
+  @Test public void testToJobID() {
+    JobID jid = MRApps.toJobID("job_1_1_1");
+    assertEquals(1, jid.appID.clusterTimeStamp);
+    assertEquals(1, jid.appID.id);
+    assertEquals(1, jid.id);
+  }
+
+  @Test(expected=YarnException.class) public void testJobIDShort() {
+    MRApps.toJobID("job_0_0");
+  }
+
+  @Test public void testTaskIDtoString() {
+    TaskID tid = new TaskID();
+    tid.jobID = new JobID();
+    tid.jobID.appID = new ApplicationID();
+    tid.taskType = TaskType.MAP;
+    assertEquals("task_0_0_0_m_0", MRApps.toString(tid));
+    tid.taskType = TaskType.REDUCE;
+    assertEquals("task_0_0_0_r_0", MRApps.toString(tid));
+  }
+
+  @Test public void testToTaskID() {
+    TaskID tid = MRApps.toTaskID("task_1_2_3_r_4");
+    assertEquals(1, tid.jobID.appID.clusterTimeStamp);
+    assertEquals(2, tid.jobID.appID.id);
+    assertEquals(3, tid.jobID.id);
+    assertEquals(TaskType.REDUCE, tid.taskType);
+    assertEquals(4, tid.id);
+
+    tid = MRApps.toTaskID("task_1_2_3_m_4");
+    assertEquals(TaskType.MAP, tid.taskType);
+  }
+
+  @Test(expected=YarnException.class) public void testTaskIDShort() {
+    MRApps.toTaskID("task_0_0_0_m");
+  }
+
+  @Test(expected=YarnException.class) public void testTaskIDBadType() {
+    MRApps.toTaskID("task_0_0_0_x_0");
+  }
+
+  @Test public void testTaskAttemptIDtoString() {
+    TaskAttemptID taid = new TaskAttemptID();
+    taid.taskID = new TaskID();
+    taid.taskID.taskType = TaskType.MAP;
+    taid.taskID.jobID = new JobID();
+    taid.taskID.jobID.appID = new ApplicationID();
+    assertEquals("attempt_0_0_0_m_0_0", MRApps.toString(taid));
+  }
+
+  @Test public void testToTaskAttemptID() {
+    TaskAttemptID taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
+    assertEquals(0, taid.taskID.jobID.appID.clusterTimeStamp);
+    assertEquals(1, taid.taskID.jobID.appID.id);
+    assertEquals(2, taid.taskID.jobID.id);
+    assertEquals(3, taid.taskID.id);
+    assertEquals(4, taid.id);
+  }
+
+  @Test(expected=YarnException.class) public void testTaskAttemptIDShort() {
+    MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+<?xml version="1.0"?><project>
+  <parent>
+    <artifactId>hadoop-mapreduce-client</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-mapreduce-client-core</artifactId>
+  <name>hadoop-mapreduce-client</name>
+  <version>${yarn.version}</version>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+  
+<build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:776175-785643

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!-- This is the default mapred-queues.xml file that is loaded in the case
+     that the user does not have such a file on their classpath. -->
+<queues>
+  <queue>
+    <name>default</name>
+    <properties>
+    </properties>
+    <state>running</state>
+    <acl-submit-job> </acl-submit-job>
+    <acl-administer-jobs> </acl-administer-jobs>
+  </queue>
+</queues>
\ No newline at end of file

Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java&r1=1082666&r2=1082677&rev=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Thu Mar 17 20:21:13 2011
@@ -24,9 +24,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
-import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-
 /**
  * Distribute application-specific large, read-only files efficiently.
  * 

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/DistributedCache.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/DistributedCache.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/package-info.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/package-info.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/package-info.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AuditLogger.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/AuditLogger.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AuditLogger.java:776175-785643

Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java&r1=1082666&r2=1082677&rev=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Mar 17 20:21:13 2011
@@ -560,7 +560,7 @@ public class BackupStore<K,V> {
 
     private Writer<K,V> createSpillFile() throws IOException {
       Path tmp =
-          new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+          new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_"
               + (spillNumber++) + ".out");
 
       LOG.info("Created file: " + tmp);

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BackupStore.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BasicTypeSorterBase.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BufferSorter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BufferSorter.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BufferSorter.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/CleanupQueue.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java:776175-785643

Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Clock.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/Clock.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java:776175-785643



Mime
View raw message