hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1097605 [2/3] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ yarn/yarn-...
Date Thu, 28 Apr 2011 20:51:23 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1097605&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Thu Apr 28 20:51:21 2011
@@ -0,0 +1,326 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeManagerInfoPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZKStore implements Store {
+  private final Configuration conf;
+  private final ZooKeeper zkClient;
+  private static final Log LOG = LogFactory.getLog(ZKStore.class);
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private static final String NODES = "nodes/";
+  private static final String APPS = "apps/";
+  private static final String ZK_PATH_SEPARATOR = "/";
+  private static final String NODE_ID = "nodeid";
+  private static final String APP_MASTER = "master";
+  private static final String LAST_CONTAINER_ID = "last_containerid";
+  private final String ZK_ADDRESS;
+  private final int ZK_TIMEOUT;
+  
+  /** TODO make this generic **/
+  private NodeIdPBImpl nodeId = new NodeIdPBImpl();
+  
+  /**
+   * TODO fix this for later to handle all kinds of events 
+   * of connection and session events.
+   *
+   */
+  private class ZKWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent arg0) {
+    }
+  }
+
+  public ZKStore(Configuration conf) throws IOException {
+    this.conf = conf;
+    this.ZK_ADDRESS = conf.get(YarnConfiguration.ZK_ADDRESS);
+    this.ZK_TIMEOUT = conf.getInt(YarnConfiguration.ZK_SESSION_TIMEOUT,
+        YarnConfiguration.DEFAULT_ZK_TIMEOUT);
+    zkClient = new ZooKeeper(this.ZK_ADDRESS, 
+        this.ZK_TIMEOUT,
+        createZKWatcher() 
+    );
+    this.nodeId.setId(0);
+  }
+  
+  protected Watcher createZKWatcher() {
+    return new ZKWatcher();   
+  }
+  
+  private NodeManagerInfoPBImpl createNodeManagerInfo(NodeManager nodeInfo) {
+    NodeManagerInfo node = 
+      recordFactory.newRecordInstance(NodeManagerInfo.class);
+      node.setNodeAddress(nodeInfo.getNodeAddress());
+      node.setRackName(nodeInfo.getRackName());
+      node.setCapability(nodeInfo.getTotalCapability());
+      node.setUsed(nodeInfo.getUsedResource());
+      node.setNumContainers(nodeInfo.getNumContainers());
+      return (NodeManagerInfoPBImpl)node;
+  }
+  
+  @Override
+  public synchronized void storeNode(NodeManager node) throws IOException {
+    /** create a storage node and store it in zk **/
+    NodeManagerInfoPBImpl nodeManagerInfo = createNodeManagerInfo(node);
+    byte[] bytes = nodeManagerInfo.getProto().toByteArray();
+    try {
+      zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
+          CreateMode.PERSISTENT);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException("Interrupted");
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+
+  @Override
+  public synchronized void removeNode(NodeManager node) throws IOException {
+    /** remove a storage node **/
+    try {
+      zkClient.delete(NODES + Integer.toString(node.getNodeID().getId()), -1);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException("Interrupted");
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+    
+  }
+  
+  private static IOException convertToIOException(KeeperException ke) {
+    IOException io = new IOException();
+    io.setStackTrace(ke.getStackTrace());
+    return io;
+  }
+
+  @Override
+  public synchronized NodeId getNextNodeId() throws IOException {
+    int num = nodeId.getId();
+    num++;
+    nodeId.setId(num);
+    try {
+      zkClient.setData(NODES + NODE_ID, nodeId.getProto().toByteArray() , -1);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      throw convertToIOException(ke);
+    }
+    return nodeId;
+  }
+
+  private String containerPathFromContainerId(ContainerId containerId) {
+    String appString = ConverterUtils.toString(containerId.getAppId());
+    return appString + "/" + containerId.getId();
+  }
+  
+  @Override
+  public synchronized void storeContainer(Container container) throws IOException {
+    ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+    try {
+      zkClient.create(APPS + containerPathFromContainerId(container.getId())
+          , containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+
+  @Override
+  public synchronized void removeContainer(Container container) throws IOException {
+    ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+    try { 
+      zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
+          -1);
+    } catch(InterruptedException ie) {
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+
+  @Override
+  public synchronized void storeApplication(ApplicationId application, ApplicationSubmissionContext 
+      context, ApplicationMaster master) throws IOException {
+    ApplicationSubmissionContextPBImpl contextPBImpl = (ApplicationSubmissionContextPBImpl) context;
+    String appString = APPS + ConverterUtils.toString(application);
+    ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
+    
+    try {
+      zkClient.create(appString, contextPBImpl.getProto()
+          .toByteArray(), null, CreateMode.PERSISTENT);
+      zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER, 
+          masterPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+
+  @Override
+  public synchronized void removeApplication(ApplicationId application) throws IOException {
+    try {
+      zkClient.delete(APPS + ConverterUtils.toString(application), -1);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      LOG.info("Keeper Exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+  
+  @Override
+  public synchronized RMState restore() throws IOException {
+    ZKRMState rmState = new ZKRMState();
+    rmState.load();
+    return rmState;
+  }  
+  
+  private class ZKRMState implements RMState {
+    List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+    List<ApplicationSubmissionContext> appSubmissionContexts = new 
+      ArrayList<ApplicationSubmissionContext>();
+    List<ApplicationMaster> masters = 
+      new ArrayList<ApplicationMaster>();
+    List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+    
+    public ZKRMState() {
+      LOG.info("Restoring RM state from ZK");
+    }
+    
+    private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
+      /** get the list of nodes stored in zk **/
+      //TODO PB
+      Stat stat = new Stat();
+      try {
+        List<String> children = zkClient.getChildren(NODES, false);
+        for (String child: children) {
+          byte[] data = zkClient.getData(NODES + child, false, stat);
+          NodeManagerInfoPBImpl nmImpl = new NodeManagerInfoPBImpl(
+              NodeManagerInfoProto.parseFrom(data));
+          nodes.add(nmImpl);
+        }
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted" , ie);
+        throw new InterruptedIOException("Interrupted");
+      } catch(KeeperException ke) {
+        LOG.error("Failed to list nodes", ke);
+        throw convertToIOException(ke);
+      }
+      return nodes;
+    }
+    
+    @Override
+    public List<NodeManager> getStoredNodeManagers()  throws IOException {
+      return nodeManagers;
+    }
+
+    @Override
+    public List<ApplicationSubmissionContext> getStoredSubmissionContexts() {
+      return appSubmissionContexts;
+    }
+
+    @Override
+    public NodeId getLastLoggedNodeId() {
+      return null;
+    }
+    
+    private void readLastNodeId() throws IOException {
+      Stat stat = new Stat();
+      try {
+        byte[] data = zkClient.getData(NODES + NODE_ID, false, stat);
+        nodeId = new NodeIdPBImpl(NodeIdProto.parseFrom(data));
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        LOG.info("Keeper Exception", ke);
+        throw convertToIOException(ke);
+      }
+    }
+    
+    private void load() throws IOException {
+      List<NodeManagerInfo> nodeInfos = listStoredNodes();
+      for (NodeManagerInfo node: nodeInfos) {
+        NodeManager nm = new NodeManagerImpl(node.getNodeId(),
+            node.getNodeAddress(), node.getHttpAddress(), RMResourceTrackerImpl
+            .resolve(node.getNodeAddress()), node.getCapability());
+        nodeManagers.add(nm);
+      }
+      readLastNodeId();
+      /* make sure we get all the containers */
+      
+    }
+    @Override
+    public List<ApplicationMaster> getStoredAMs() throws IOException {
+      return masters;
+    }
+  }
+
+  @Override
+  public void updateApplicationState(ApplicationId applicationId,
+      ApplicationMaster master) throws IOException {
+    
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java Thu Apr 28 20:51:21 2011
@@ -19,8 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 
 /**
  * Node managers information on available resources 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java Thu Apr 28 20:51:21 2011
@@ -22,10 +22,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 
 
 public class NodeStatus {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Apr 28 20:51:21 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -57,7 +58,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Thu Apr 28 20:51:21 2011
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -53,9 +54,6 @@ import org.apache.hadoop.yarn.server.res
 @Evolving
 public class Application {
   private static final Log LOG = LogFactory.getLog(Application.class);
-
-  private AtomicInteger containerCtr = new AtomicInteger(0);
-
   final ApplicationId applicationId;
   final Queue queue;
   final String user;
@@ -75,11 +73,14 @@ public class Application {
   /* Allocated by scheduler */
   List<Container> allocated = new ArrayList<Container>(); 
   Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
+  ApplicationMaster master;
   
-  public Application(ApplicationId applicationId, Queue queue, String user) {
+  public Application(ApplicationId applicationId, ApplicationMaster master,
+      Queue queue, String user) {
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user; 
+    this.master = master;
   }
 
   public ApplicationId getApplicationId() {
@@ -99,7 +100,9 @@ public class Application {
   }
 
   public int getNewContainerId() {
-    return containerCtr.incrementAndGet();
+    int i = master.getContainerCount();
+    master.setContainerCount(++i);
+    return master.getContainerCount();
   }
 
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Thu Apr 28 20:51:21 2011
@@ -35,15 +35,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**
- * This class is used by ClusterInfo to keep track of all the applications/containers
+ * This class is used to keep track of all the applications/containers
  * running on a node.
  *
  */

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Apr 28 20:51:21 2011
@@ -25,8 +25,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Apr 28 20:51:21 2011
@@ -23,6 +23,7 @@ import java.util.List;
 
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -50,12 +51,13 @@ public interface YarnScheduler {
   /**
    * A new application has been submitted to the ResourceManager
    * @param applicationId application which has been submitted
+   * @param master the application master
    * @param user application user
    * @param queue queue to which the applications is being submitted
    * @param priority application priority
    */
-  public void addApplication(ApplicationId applicationId, String user, 
-      String queue, Priority priority) 
+  public void addApplication(ApplicationId applicationId, ApplicationMaster master,
+      String user, String queue, Priority priority) 
   throws IOException;
   
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Apr 28 20:51:21 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -213,7 +214,7 @@ implements ResourceScheduler, CapacitySc
   }
   
   @Override
-  public void addApplication(ApplicationId applicationId, 
+  public void addApplication(ApplicationId applicationId, ApplicationMaster master,
       String user, String queueName, Priority priority)
   throws IOException {
     Queue queue = queues.get(queueName);
@@ -228,7 +229,7 @@ implements ResourceScheduler, CapacitySc
           " submitted by user " + user + " to non-leaf queue: " + queueName);
     }
 
-    Application application = new Application(applicationId, queue, user); 
+    Application application = new Application(applicationId, master, queue, user); 
     try {
       queue.submitApplication(application, user, queueName, priority);
     } catch (AccessControlException ace) {
@@ -410,7 +411,7 @@ implements ResourceScheduler, CapacitySc
     switch(event.getType()) {
     case ADD:
       try {
-        addApplication(event.getAppContext().getApplicationID(), 
+        addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getMaster(),
             event.getAppContext().getUser(), event.getAppContext().getQueue(),
             event.getAppContext().getSubmissionContext().getPriority());
       } catch(IOException ie) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Apr 28 20:51:21 2011
@@ -218,13 +218,11 @@ public class ParentQueue implements Queu
 
   @Override
   public float getAbsoluteMaximumCapacity() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
   public float getMaximumCapacity() {
-    // TODO Auto-generated method stub
     return 0;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Apr 28 20:51:21 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -204,11 +205,11 @@ public class FifoScheduler implements Re
   }
 
   @Override
-  public synchronized void addApplication(ApplicationId applicationId, 
+  public synchronized void addApplication(ApplicationId applicationId, ApplicationMaster master,
       String user, String unusedQueue, Priority unusedPriority) 
   throws IOException {
     applications.put(applicationId, 
-        new Application(applicationId, DEFAULT_QUEUE, user));
+        new Application(applicationId, master, DEFAULT_QUEUE, user));
     LOG.info("Application Submission: " + applicationId.getId() + " from " + user + 
         ", currently active: " + applications.size());
   }
@@ -473,7 +474,8 @@ public class FifoScheduler implements Re
     switch(event.getType()) {
     case ADD:
       try {
-        addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getUser(),
+        addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getMaster(), 
+            event.getAppContext().getUser(),
             event.getAppContext().getQueue(), event.getAppContext().getSubmissionContext().getPriority());
       } catch(IOException ie) {
         LOG.error("Unable to add application " + event.getAppContext().getApplicationID(), ie);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Thu Apr 28 20:51:21 2011
@@ -23,11 +23,11 @@ import com.google.common.collect.Lists;
 import java.util.List;
 
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Thu Apr 28 20:51:21 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Thu Apr 28 20:51:21 2011
@@ -43,11 +43,12 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +63,7 @@ public class TestAMLaunchFailure extends
   ApplicationTokenSecretManager applicationTokenSecretManager = 
     new ApplicationTokenSecretManager();
 
-  private ASMContext context;
+  private RMContext context;
 
   private static class DummyYarnScheduler implements YarnScheduler {
     private Container container = recordFactory.newRecordInstance(Container.class);
@@ -74,11 +75,6 @@ public class TestAMLaunchFailure extends
     }
 
     @Override
-    public void addApplication(ApplicationId applicationId, String user,
-        String queue, Priority priority) throws IOException {
-    }
-
-    @Override
     public void removeApplication(ApplicationId applicationId)
         throws IOException {
     }
@@ -94,6 +90,14 @@ public class TestAMLaunchFailure extends
     public List<QueueUserACLInfo> getQueueUserAclInfo() {
       return null;
     }
+
+    @Override
+    public void addApplication(ApplicationId applicationId,
+        ApplicationMaster master, String user, String queue, Priority priority)
+        throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
   }
 
   private class DummyApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
@@ -111,7 +115,7 @@ public class TestAMLaunchFailure extends
       private AtomicInteger notify = new AtomicInteger();
       private AppContext app;
 
-      public DummyApplicationMasterLauncher(ASMContext context) {
+      public DummyApplicationMasterLauncher(RMContext context) {
         context.getDispatcher().register(AMLauncherEventType.class, this);
         new TestThread().start();
       }
@@ -163,9 +167,11 @@ public class TestAMLaunchFailure extends
 
   @Before
   public void setUp() {
-    context = new ResourceManager.ASMContextImpl();
-    asmImpl = new ExtApplicationsManagerImpl(applicationTokenSecretManager, scheduler);
+    context = new ResourceManager.RMContextImpl(new MemStore());
     Configuration conf = new Configuration();
+    context.getDispatcher().init(conf);
+    context.getDispatcher().start();
+    asmImpl = new ExtApplicationsManagerImpl(applicationTokenSecretManager, scheduler);
     new DummyApplicationTracker();
     conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 3000L);
     conf.setInt(YarnConfiguration.AM_MAX_RETRIES, 1);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Thu Apr 28 20:51:21 2011
@@ -41,8 +41,9 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.junit.After;
 import org.junit.Before;
@@ -54,11 +55,11 @@ public class TestAMRMRPCResponseId exten
   DummyApplicationsManager applicationsManager;
   DummyScheduler scheduler;
 
-  private ASMContext context;
+  private RMContext context;
   private class DummyApplicationsManager extends ApplicationsManagerImpl {
     public DummyApplicationsManager(
         ApplicationTokenSecretManager applicationTokenSecretManager,
-        YarnScheduler scheduler, ASMContext asmContext) {
+        YarnScheduler scheduler, RMContext asmContext) {
       super(applicationTokenSecretManager, scheduler, asmContext);      
     }
     @Override
@@ -84,11 +85,6 @@ public class TestAMRMRPCResponseId exten
     }
 
     @Override
-    public void addApplication(ApplicationId applicationId, String user,
-        String queue, Priority priority) throws IOException {
-    }
-
-    @Override
     public void removeApplication(ApplicationId applicationId)
         throws IOException {
     }
@@ -104,11 +100,19 @@ public class TestAMRMRPCResponseId exten
     public List<QueueUserACLInfo> getQueueUserAclInfo() {
       return null;
     }
+
+    @Override
+    public void addApplication(ApplicationId applicationId,
+        ApplicationMaster master, String user, String queue, Priority priority)
+        throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
   }
   
   @Before
   public void setUp() {
-    context = new ResourceManager.ASMContextImpl();
+    context = new ResourceManager.RMContextImpl(new MemStore());
     scheduler = new DummyScheduler();
     applicationsManager = new DummyApplicationsManager(new 
         ApplicationTokenSecretManager(), scheduler, context);
@@ -117,6 +121,8 @@ public class TestAMRMRPCResponseId exten
     Configuration conf = new Configuration();
     applicationsManager.init(conf);
     amService.init(conf);
+    context.getDispatcher().init(conf);
+    context.getDispatcher().start();
   }
   
   @After

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Thu Apr 28 20:51:21 2011
@@ -12,7 +12,6 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -31,13 +30,13 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -55,7 +54,7 @@ import org.junit.Test;
 public class TestAMRestart extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
   ApplicationsManagerImpl appImpl;
-  ASMContext asmContext = new ResourceManager.ASMContextImpl();
+  RMContext asmContext = new ResourceManager.RMContextImpl(new MemStore());
   ApplicationTokenSecretManager appTokenSecretManager = 
     new ApplicationTokenSecretManager();
   DummyResourceScheduler scheduler;
@@ -75,7 +74,7 @@ public class TestAMRestart extends TestC
   private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
     public ExtApplicationsManagerImpl(
         ApplicationTokenSecretManager applicationTokenSecretManager,
-        YarnScheduler scheduler, ASMContext asmContext) {
+        YarnScheduler scheduler, RMContext asmContext) {
       super(applicationTokenSecretManager, scheduler, asmContext);
     }
 
@@ -183,10 +182,7 @@ public class TestAMRestart extends TestC
     public void reinitialize(Configuration conf,
         ContainerTokenSecretManager secretManager) {
     }
-    @Override
-    public void addApplication(ApplicationId applicationId, String user,
-        String queue, Priority priority) throws IOException {
-    }
+
     @Override
     public void removeApplication(ApplicationId applicationId)
         throws IOException {
@@ -201,6 +197,13 @@ public class TestAMRestart extends TestC
     public List<QueueUserACLInfo> getQueueUserAclInfo() {
       return null;
     }
+    @Override
+    public void addApplication(ApplicationId applicationId,
+        ApplicationMaster master, String user, String queue, Priority priority)
+        throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
   }
 
   @Before
@@ -208,10 +211,13 @@ public class TestAMRestart extends TestC
     appID = recordFactory.newRecordInstance(ApplicationId.class);
     appID.setClusterTimestamp(System.currentTimeMillis());
     appID.setId(1);
+    Configuration conf = new Configuration();
     scheduler = new DummyResourceScheduler();
+    asmContext.getDispatcher().init(conf);
+    asmContext.getDispatcher().start();
     asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
     appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
-    Configuration conf = new Configuration();
+    
     conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
     conf.setInt(YarnConfiguration.AM_MAX_RETRIES, maxFailures);
     appImpl.init(conf);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Thu Apr 28 20:51:21 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
@@ -25,6 +25,7 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -36,12 +37,13 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +51,7 @@ import org.junit.Test;
 public class TestASMStateMachine extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class);
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  ASMContext context = new ResourceManager.ASMContextImpl();
+  RMContext context = new ResourceManager.RMContextImpl(new MemStore());
   EventHandler handler;
   private boolean snreceivedCleanUp = false;
   private boolean snAllocateReceived = false;
@@ -61,6 +63,8 @@ public class TestASMStateMachine extends
 
   @Before
   public void setUp() {
+    context.getDispatcher().init(new Configuration());
+    context.getDispatcher().start();
     handler = context.getDispatcher().getEventHandler();
     new DummyAMLaunchEventHandler();
     new DummySNEventHandler();
@@ -79,237 +83,187 @@ public class TestASMStateMachine extends
 
     public DummyAMLaunchEventHandler() {
       context.getDispatcher().register(AMLauncherEventType.class, this);
-      new Responder().start();
-    }
-
-    private class Responder extends Thread {
-      public void run() {
-        try {
-          synchronized (amsync) {
-            while(amsync.get() == 0) {
-              amsync.wait();
-            }
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      
-      context.getDispatcher().getEventHandler().handle(
-      new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
-      appcontext));
-      synchronized(waitForState) {
-        waitForState.addAndGet(1);
-        waitForState.notify();
-      }
     }
-  }
 
-  @Override
-  public void handle(ASMEvent<AMLauncherEventType> event) {
-    switch(event.getType()) {
-    case LAUNCH:
-      launchCalled = true;
-      appcontext = event.getAppContext();
-      synchronized(amsync) {
-        amsync.addAndGet(1);
-        amsync.notify();
+    @Override
+    public void handle(ASMEvent<AMLauncherEventType> event) {
+      switch(event.getType()) {
+      case LAUNCH:
+        launchCalled = true;
+        appcontext = event.getAppContext();
+        context.getDispatcher().getEventHandler().handle(
+            new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
+                appcontext));
+        break;
+      case CLEANUP:
+        launchCleanupCalled = true;
+        break;
       }
-      break;
-    case CLEANUP:
-      launchCleanupCalled = true;
-      break;
     }
   }
-}
 
-private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
-  AppContext appContext;
-  AtomicInteger snsync = new AtomicInteger(0);
-
-  public DummySNEventHandler() {
-    context.getDispatcher().register(SNEventType.class, this);
-    new Responder().start();
-  }
+  private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
+    AppContext appContext;
+    AtomicInteger snsync = new AtomicInteger(0);
 
-  private class Responder extends Thread {
-    public void run() {
-      synchronized (snsync) {
-        try {
-          while (snsync.get() == 0) {
-            snsync.wait();
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-      context.getDispatcher().getEventHandler().handle(
-      new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
-      appContext));
-      synchronized(waitForState) {
-        waitForState.addAndGet(1);
-        waitForState.notify();
-      }
+    public DummySNEventHandler() {
+      context.getDispatcher().register(SNEventType.class, this);
     }
-  }
 
-  @Override
-  public void handle(ASMEvent<SNEventType> event) {
-    switch(event.getType()) {
-    case CLEANUP:
-      snreceivedCleanUp = true;
-      break;
-    case SCHEDULE:
-      snAllocateReceived = true;
-      appContext = event.getAppContext();
-      synchronized (snsync) {
-        snsync.addAndGet(1);
-        snsync.notify();
+    @Override
+    public void handle(ASMEvent<SNEventType> event) {
+      switch(event.getType()) {
+      case CLEANUP:
+        snreceivedCleanUp = true;
+        break;
+      case SCHEDULE:
+        snAllocateReceived = true;
+        appContext = event.getAppContext();
+        context.getDispatcher().getEventHandler().handle(
+            new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
+                appContext));
+        break;
       }
-      break;
     }
-  }
-
-}
 
-private static class StatusContext implements AppContext {
-  @Override
-  public ApplicationSubmissionContext getSubmissionContext() {
-    return null;
-  }
-  @Override
-  public Resource getResource() {
-    return null;
-  }
-  @Override
-  public ApplicationId getApplicationID() {
-    return null;
-  }
-  @Override
-  public ApplicationStatus getStatus() {
-    ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
-    status.setLastSeen(-99);
-    return status;
-  }
-  @Override
-  public ApplicationMaster getMaster() {
-    return null;
-  }
-  @Override
-  public Container getMasterContainer() {
-    return null;
   }
-  @Override
-  public String getUser() {
-    return null;
-  }
-  @Override
-  public long getLastSeen() {
-    return 0;
-  }
-  @Override
-  public String getName() {
-    return null;
-  }
-  @Override
-  public String getQueue() {
-    return null;
-  }
-  @Override
-  public int getFailedCount() {
-    return 0;
-  }
-}
 
-private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
-  public ApplicationTracker() {
-    context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+  private static class StatusContext implements AppContext {
+    @Override
+    public ApplicationSubmissionContext getSubmissionContext() {
+      return null;
+    }
+    @Override
+    public Resource getResource() {
+      return null;
+    }
+    @Override
+    public ApplicationId getApplicationID() {
+      return null;
+    }
+    @Override
+    public ApplicationStatus getStatus() {
+      ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+      status.setLastSeen(-99);
+      return status;
+    }
+    @Override
+    public ApplicationMaster getMaster() {
+      return null;
+    }
+    @Override
+    public Container getMasterContainer() {
+      return null;
+    }
+    @Override
+    public String getUser() {
+      return null;
+    }
+    @Override
+    public long getLastSeen() {
+      return 0;
+    }
+    @Override
+    public String getName() {
+      return null;
+    }
+    @Override
+    public String getQueue() {
+      return null;
+    }
+    @Override
+    public int getFailedCount() {
+      return 0;
+    }
   }
 
-  @Override
-  public void handle(ASMEvent<ApplicationTrackerEventType> event) {
-    switch (event.getType()) {
-    case ADD:
-      addedApplication = true;
-      break;
-    case REMOVE:
-      removedApplication = true;
-      break;
+  private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+    public ApplicationTracker() {
+      context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+    }
+
+    @Override
+    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+      switch (event.getType()) {
+      case ADD:
+        addedApplication = true;
+        break;
+      case REMOVE:
+        removedApplication = true;
+        break;
+      }
     }
   }
-}
 
-private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
+  private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
 
-  MockAppplicationMasterInfo() {
-    context.getDispatcher().register(ApplicationEventType.class, this);
-  }
-  @Override
-  public void handle(ASMEvent<ApplicationEventType> event) {
-    LOG.info("The event type is " + event.getType());
+    MockAppplicationMasterInfo() {
+      context.getDispatcher().register(ApplicationEventType.class, this);
+    }
+    @Override
+    public void handle(ASMEvent<ApplicationEventType> event) {
+      LOG.info("The event type is " + event.getType());
+    }
   }
-}
 
-public void waitForState(ApplicationState state, ApplicationMasterInfo info) {
-  synchronized(waitForState) {
-    try {
-      while (waitForState.get() == 0) {   
-        waitForState.wait(10000L);
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+  private void waitForState( ApplicationState 
+      finalState, ApplicationMasterInfo masterInfo) throws Exception {
+    int count = 0;
+    while(masterInfo.getState() != finalState && count < 10) {
+      Thread.sleep(500);
+      count++;
     }
+    assertTrue(masterInfo.getState() == finalState);
+  } 
+  
+  /* Test the state machine. 
+   * 
+   */
+  @Test
+  public void testStateMachine() throws Exception {
+    ApplicationSubmissionContext submissioncontext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    submissioncontext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
+    submissioncontext.getApplicationId().setId(1);
+    submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
+
+    ApplicationMasterInfo masterInfo 
+    = new ApplicationMasterInfo(context, "dummyuser", submissioncontext, "dummyToken");
+
+    context.getDispatcher().register(ApplicationEventType.class, masterInfo);
+    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+        ALLOCATE, masterInfo));
+
+    waitForState(ApplicationState.ALLOCATED, masterInfo);
+    handler.handle(new ASMEvent<ApplicationEventType>(
+        ApplicationEventType.LAUNCH, masterInfo));
+
+    waitForState(ApplicationState.LAUNCHED, masterInfo);
+    Assert.assertTrue(snAllocateReceived);
+    Assert.assertTrue(launchCalled);
+    Assert.assertTrue(addedApplication);
+    handler.handle(new ASMEvent<ApplicationEventType>(
+        ApplicationEventType.REGISTERED, masterInfo));
+    waitForState(ApplicationState.RUNNING, masterInfo);
+    Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+    handler.handle(new ASMEvent<ApplicationEventType>(
+        ApplicationEventType.STATUSUPDATE, new StatusContext()));
+
+    /* check if the state is still RUNNING */
+
+    Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+
+    handler.handle(
+        new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, masterInfo));
+    waitForState(ApplicationState.COMPLETED, masterInfo);
+    Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
+    /* check if clean up is called for everyone */
+    Assert.assertTrue(launchCleanupCalled);
+    Assert.assertTrue(snreceivedCleanUp);
+    Assert.assertTrue(removedApplication);
+
+    /* check if expiry doesnt make it failed */
+    handler.handle(
+        new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
+    Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());   
   }
-  Assert.assertEquals(state, info.getState());
-}
-
-/* Test the state machine. 
- * 
- */
-@Test
-public void testStateMachine() {
-  ApplicationSubmissionContext submissioncontext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-  submissioncontext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
-  submissioncontext.getApplicationId().setId(1);
-  submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
-
-  ApplicationMasterInfo masterInfo 
-  = new ApplicationMasterInfo(handler, "dummyuser", submissioncontext, "dummyToken");
-
-  context.getDispatcher().register(ApplicationEventType.class, masterInfo);
-  handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-  ALLOCATE, masterInfo));
-
-  waitForState(ApplicationState.ALLOCATED, masterInfo);
-  waitForState.getAndDecrement();
-  handler.handle(new ASMEvent<ApplicationEventType>(
-  ApplicationEventType.LAUNCH, masterInfo));
-
-  waitForState(ApplicationState.LAUNCHED, masterInfo);
-  Assert.assertTrue(snAllocateReceived);
-  Assert.assertTrue(launchCalled);
-  Assert.assertTrue(addedApplication);
-  handler.handle(new ASMEvent<ApplicationEventType>(
-  ApplicationEventType.REGISTERED, masterInfo));
-  Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
-  handler.handle(new ASMEvent<ApplicationEventType>(
-  ApplicationEventType.STATUSUPDATE, new StatusContext()));
-
-  /* check if the state is still RUNNING */
-
-  Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
-
-  handler.handle(
-  new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, masterInfo));
-
-  Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
-  /* check if clean up is called for everyone */
-  Assert.assertTrue(launchCleanupCalled);
-  Assert.assertTrue(snreceivedCleanUp);
-  Assert.assertTrue(removedApplication);
-
-  /* check if expiry doesnt make it failed */
-  handler.handle(
-  new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
-  Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());   
-}
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Thu Apr 28 20:51:21 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -45,14 +46,14 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
@@ -77,15 +78,18 @@ public class TestApplicationCleanup exte
   private ExtASM asm;
   private static final int memoryNeeded = 100;
 
-  private final ASMContext context = new ResourceManager.ASMContextImpl();
+  private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
 
   @Before
   public void setUp() {
     new DummyApplicationTracker();
     scheduler = new FifoScheduler();
     context.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+    Configuration conf = new Configuration();
+    context.getDispatcher().init(conf);
+    context.getDispatcher().start();
     asm = new ExtASM(new ApplicationTokenSecretManager(), scheduler);
-    asm.init(new Configuration());
+    asm.init(conf);
   }
 
   @After
@@ -115,30 +119,8 @@ public class TestApplicationCleanup exte
       private AtomicInteger notify = new AtomicInteger(0);
       private AppContext appContext;
 
-      public DummyApplicationMasterLauncher(ASMContext context) {
+      public DummyApplicationMasterLauncher(RMContext context) {
         context.getDispatcher().register(AMLauncherEventType.class, this);
-        new Responder().start();
-      }
-
-      private class Responder extends Thread {
-        public void run() {
-          synchronized(notify) {
-            try {
-              while (notify.get() == 0) {   
-                notify.wait();
-              }
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-          }
-          context.getDispatcher().getEventHandler().
-          handle(new ASMEvent<ApplicationEventType>(
-          ApplicationEventType.LAUNCHED, appContext));
-          synchronized(waitForState) {
-            waitForState.addAndGet(1);
-            waitForState.notify();
-          }
-        }
       }
 
       @Override
@@ -152,11 +134,9 @@ public class TestApplicationCleanup exte
           LOG.info("Launcher Launch called");
           launcherLaunchCalled = true;
           appContext = appEvent.getAppContext();
-          synchronized (notify) {
-            notify.addAndGet(1);
-            notify.notify();
-            LOG.info("Done notifying launcher ");
-          }
+          context.getDispatcher().getEventHandler().
+          handle(new ASMEvent<ApplicationEventType>(
+          ApplicationEventType.LAUNCHED, appContext));
           break;
         default:
           break;
@@ -167,27 +147,8 @@ public class TestApplicationCleanup exte
     private class DummySchedulerNegotiator implements EventHandler<ASMEvent<SNEventType>> {
       private AtomicInteger snnotify = new AtomicInteger(0);
       AppContext acontext;
-      public  DummySchedulerNegotiator(ASMContext context) {
+      public  DummySchedulerNegotiator(RMContext context) {
         context.getDispatcher().register(SNEventType.class, this);
-        new Responder().start();
-      }
-
-      private class Responder extends Thread {
-        public void run() {
-          LOG.info("Waiting for notify");
-          synchronized(snnotify) {
-            try {
-              while(snnotify.get() == 0) {
-                snnotify.wait();
-              }
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-          }
-          context.getDispatcher().getEventHandler().
-          handle(new ASMEvent<ApplicationEventType>(
-          ApplicationEventType.ALLOCATED, acontext));
-        }
       }
 
       @Override
@@ -200,11 +161,9 @@ public class TestApplicationCleanup exte
         case SCHEDULE:
           schedulerScheduleCalled = true;
           acontext = appEvent.getAppContext();
-          LOG.info("Schedule received");
-          synchronized(snnotify) {
-            snnotify.addAndGet(1);
-            snnotify.notify();
-          }
+          context.getDispatcher().getEventHandler().
+          handle(new ASMEvent<ApplicationEventType>(
+          ApplicationEventType.ALLOCATED, acontext));
         default:
           break;
         }
@@ -229,19 +188,17 @@ public class TestApplicationCleanup exte
     }
 
   }
-
-  private void waitForState(ApplicationState state, ApplicationMasterInfo masterInfo) {
-    synchronized(waitForState) {
-      try {
-        while(waitForState.get() == 0) {
-          waitForState.wait(10000L);
-        }
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted thread " , e);
-      }
+  
+  private void waitForState(ApplicationState 
+      finalState, ApplicationMasterInfo masterInfo) throws Exception {
+    int count = 0;
+    while(masterInfo.getState() != finalState && count < 10) {
+      Thread.sleep(500);
+      count++;
     }
-    Assert.assertEquals(state, masterInfo.getState());
+    assertTrue(masterInfo.getState() == finalState);
   }
+  
 
   private ResourceRequest createNewResourceRequest(int capability, int i) {
     ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
@@ -300,6 +257,9 @@ public class TestApplicationCleanup exte
       (firstNodeMemory - (2*memoryNeeded)));
     ApplicationMasterInfo masterInfo = asm.getApplicationMasterInfo(appID);
     asm.finishApplication(appID);
+    while (asm.launcherCleanupCalled != true) {
+      Thread.sleep(500);
+    }
     assertTrue(asm.launcherCleanupCalled == true);
     assertTrue(asm.launcherLaunchCalled == true);
     assertTrue(asm.schedulerCleanupCalled == true);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java Thu Apr 28 20:51:21 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
@@ -25,18 +27,20 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,7 +54,7 @@ public class TestApplicationMasterExpiry
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   AMTracker tracker;
   
-  private final ASMContext context = new ResourceManager.ASMContextImpl();
+  private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
   
   @Before
   public void setUp() {
@@ -60,6 +64,8 @@ public class TestApplicationMasterExpiry
     new ApplicationEventTypeListener();
     tracker = new AMTracker(context); 
     Configuration conf = new Configuration();
+    context.getDispatcher().init(conf);
+    context.getDispatcher().start();
     conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
     tracker.init(conf);
     tracker.start();
@@ -79,7 +85,7 @@ public class TestApplicationMasterExpiry
     }
   }
   
-  private Object expiry = new Object();
+  private AtomicInteger expiry = new AtomicInteger();
   private boolean expired = false;
   
   private class ApplicationEventTypeListener implements EventHandler<ASMEvent<ApplicationEventType>> {
@@ -93,7 +99,7 @@ public class TestApplicationMasterExpiry
         expired = true;
         LOG.info("Received expiry from application " + event.getAppContext().getApplicationID());
         synchronized(expiry) {
-          expiry.notify();
+          expiry.addAndGet(1);
         }
       }
     }
@@ -117,6 +123,16 @@ public class TestApplicationMasterExpiry
     }
   }
   
+  private void waitForState(ApplicationMasterInfo masterInfo, ApplicationState 
+      finalState) throws Exception {
+    int count = 0;
+    while(masterInfo.getState() != finalState && count < 10) {
+      Thread.sleep(500);
+      count++;
+    }
+    assertTrue(masterInfo.getState() == finalState);
+  }
+
   @Test
   public void testAMExpiry() throws Exception {
     ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
@@ -130,10 +146,13 @@ public class TestApplicationMasterExpiry
     ApplicationMasterInfo masterInfo = tracker.get(context.getApplicationId());
     this.context.getDispatcher().getEventHandler().handle(
         new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED, masterInfo));
+    waitForState(masterInfo, ApplicationState.LAUNCHING);
     this.context.getDispatcher().getEventHandler().handle(
     new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED, masterInfo));
     synchronized(expiry) {
-      expiry.wait(10000);
+      while (expiry.get() == 0) {
+        expiry.wait(1000);
+      }
     }
     Assert.assertTrue(expired);
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java Thu Apr 28 20:51:21 2011
@@ -33,10 +33,11 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +59,7 @@ public class TestApplicationMasterLaunch
   Object doneLaunching = new Object();
   AtomicInteger launched = new AtomicInteger();
   AtomicInteger cleanedUp = new AtomicInteger();
-  private ASMContext context = new ResourceManager.ASMContextImpl();
+  private RMContext context = new ResourceManager.RMContextImpl(new MemStore());
   
   private class DummyASM implements EventHandler<ASMEvent<ApplicationEventType>> {
     @Override
@@ -126,9 +127,13 @@ public class TestApplicationMasterLaunch
   public void setUp() {
     asmHandle = new DummyASM();
     amLauncher = new DummyApplicationMasterLauncher(applicationTokenSecretManager,
-        clientToAMSecretManager, asmHandle);  
-    amLauncher.init(new Configuration());
+        clientToAMSecretManager, asmHandle);
+    Configuration conf = new Configuration();
+    context.getDispatcher().init(conf);
+    amLauncher.init(conf);
+    context.getDispatcher().start();
     amLauncher.start();
+    
   }
 
   @After
@@ -143,8 +148,7 @@ public class TestApplicationMasterLaunch
     context.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
     context.getApplicationId().setId(1);
     context.setUser("dummyuser");
-    ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.context.
-        getDispatcher().getEventHandler(),
+    ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.context,
         "dummyuser", context,
         "dummyclienttoken");
     amLauncher.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.LAUNCH, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Thu Apr 28 20:51:21 2011
@@ -23,9 +23,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -33,17 +36,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -53,16 +55,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 public class TestSchedulerNegotiator extends TestCase {
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private SchedulerNegotiator schedulerNegotiator;
   private DummyScheduler scheduler;
   private final int testNum = 99999;
   
-  private final ASMContext context = new ResourceManager.ASMContextImpl();
+  private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
   ApplicationMasterInfo masterInfo;
   private EventHandler handler;
   
@@ -97,10 +96,7 @@ public class TestSchedulerNegotiator ext
     @Override
     public void handle(ASMEvent<ApplicationTrackerEventType> event) {
     }
-    @Override
-    public void addApplication(ApplicationId applicationId, String user,
-        String queue, Priority priority) throws IOException {
-    }
+   
     @Override
     public void removeApplication(ApplicationId applicationId)
         throws IOException {
@@ -115,15 +111,25 @@ public class TestSchedulerNegotiator ext
     public List<QueueUserACLInfo> getQueueUserAclInfo() {
       return null;
     }
+    @Override
+    public void addApplication(ApplicationId applicationId,
+        ApplicationMaster master, String user, String queue, Priority priority)
+        throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
   }
   
   @Before
   public void setUp() {
     scheduler = new DummyScheduler();
     schedulerNegotiator = new SchedulerNegotiator(context, scheduler);
-    schedulerNegotiator.init(new Configuration());
+    Configuration conf = new Configuration();
+    schedulerNegotiator.init(conf);
     schedulerNegotiator.start();
     handler = context.getDispatcher().getEventHandler();
+    context.getDispatcher().init(conf);
+    context.getDispatcher().start();
   }
   
   @After
@@ -144,6 +150,12 @@ public class TestSchedulerNegotiator ext
     Assert.assertEquals(state, info.getState());
   }
   
+  private class DummyEventHandler implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+    @Override
+    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+    }
+  }
+
   @Test
   public void testSchedulerNegotiator() throws Exception {
     ApplicationSubmissionContext submissionContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
@@ -152,9 +164,10 @@ public class TestSchedulerNegotiator ext
     submissionContext.getApplicationId().setId(1);
     
     masterInfo =
-      new ApplicationMasterInfo(this.context.getDispatcher().getEventHandler(),
+      new ApplicationMasterInfo(this.context,
           "dummy", submissionContext, "dummyClientToken");
     context.getDispatcher().register(ApplicationEventType.class, masterInfo);
+    context.getDispatcher().register(ApplicationTrackerEventType.class, masterInfo);
     handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
     ALLOCATE, masterInfo));
     waitForState(ApplicationState.ALLOCATED, masterInfo);



Mime
View raw message