incubator-ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r1206994 - in /incubator/ambari/trunk: ./ client/src/main/java/org/apache/ambari/common/rest/agent/ controller/src/main/java/org/apache/ambari/components/ controller/src/main/java/org/apache/ambari/components/impl/ controller/src/main/java/...
Date Mon, 28 Nov 2011 06:47:25 GMT
Author: ddas
Date: Mon Nov 28 06:47:24 2011
New Revision: 1206994

URL: http://svn.apache.org/viewvc?rev=1206994&view=rev
Log:
AMBARI-140. Refactors the heartbeat handling w.r.t simplification of state management.

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/XmlComponentDefinition.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/resources/org/apache/ambari/acd/hadoop-hdfs-0.1.0.acd

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Mon Nov 28 06:47:24 2011
@@ -2,6 +2,9 @@ Ambari Change log
 
 Release 0.1.0 - unreleased
 
+  AMBARI-140. Refactors the heartbeat handling w.r.t simplification of 
+  state management. (ddas)
+
   AMBARI-138. Implement stack persistence (vgogate)
 
   AMBARI-135. Simplifies the heartbeat handling to not deal with 

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
(original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
Mon Nov 28 06:47:24 2011
@@ -49,6 +49,8 @@ public class HeartBeat {
   @XmlElement
   private List<AgentRoleState> installedRoleStates;
   @XmlElement
+  private boolean stateChangeStatus;
+  @XmlElement
   private List<ActionResult> actionResults;
   @XmlElement
   private boolean idle;
@@ -85,6 +87,10 @@ public class HeartBeat {
     return installedRoleStates;
   }
   
+  public boolean getStateChangeStatus() {
+    return stateChangeStatus;
+  }
+  
   public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
@@ -108,4 +114,8 @@ public class HeartBeat {
   public void setIdle(boolean idle) {
     this.idle = idle;
   }
+  
+  public void setStateChangeStatus(boolean stateChangeStatus) {
+    this.stateChangeStatus = stateChangeStatus;
+  }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
Mon Nov 28 06:47:24 2011
@@ -28,10 +28,6 @@ public abstract class ComponentPlugin {
   
   public abstract String getProvides();
   
-  public abstract String getInstallUser();
-  
-  public abstract String getPackage();
-  
   /**
    * Get the active roles (ie. with servers) for this component.
    * @return the list of roles in the order that they should be started
@@ -59,20 +55,13 @@ public abstract class ComponentPlugin {
                                      ) throws IOException;
   
   /**
-   * Get the role that should run the check availability command.
+   * Get the role that should run the check service command.
    * @return the role name
    * @throws IOException
    */
   public abstract String runCheckRole() throws IOException;
 
   /**
-   * Get the role that should run the initialization command.
-   * @return the role name
-   * @throws IOException
-   */
-  public abstract String runPreStartRole() throws IOException;
-
-  /**
    * Get the commands to check whether the service is up
    * @param cluster the name of the cluster
    * @param role the role that is being checked
@@ -83,6 +72,13 @@ public abstract class ComponentPlugin {
                                       String role) throws IOException;
 
   /**
+   * Get the role that should run the initialization command.
+   * @return the role name
+   * @throws IOException
+   */
+  public abstract String runPreStartRole() throws IOException;
+  
+  /**
    * Get the commands to run to preinstall a component
    * For example, MapReduce needs to have certain directories
    * on the HDFS before JobTracker can be started.

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/XmlComponentDefinition.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/XmlComponentDefinition.java?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/XmlComponentDefinition.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/XmlComponentDefinition.java
Mon Nov 28 06:47:24 2011
@@ -27,25 +27,17 @@ public class XmlComponentDefinition exte
       "requires",
       "roles",
       "prestart",
-      "install",
-      "configure",
       "start",
-      "check",
-      "uninstall"
+      "check"
   })
   @XmlRootElement
   public static class Component {
     @XmlAttribute String provides;
-    @XmlAttribute(name="package") String pkg;
-    @XmlAttribute String user;
     @XmlElement List<Requires> requires;
     @XmlElement List<Role> roles;
-    @XmlElement Install install;
-    @XmlElement Configure configure;
     @XmlElement Start start;
     @XmlElement Check check;
     @XmlElement Prestart prestart;
-    @XmlElement Uninstall uninstall;
   }
   
   @XmlAccessorType
@@ -107,38 +99,21 @@ public class XmlComponentDefinition exte
   }
 
   private final String provides;
-  private final String pkg;
   private final String[] roles;
   private final String[] dependencies;
-  private final String configureCommand;
-  private final String configureUser;
-  private final String installCommand;
-  private final String installUser;
   private final String startCommand;
-  private final String startUser;
-  private final String uninstallCommand;
-  private final String uninstallUser;
+  private final String startUser = "agent";
   private final String checkRole;
   private final String prestartRole;
   private final String prestartCommand;
-  private final String prestartUser;
+  private final String prestartUser = "agent";
   private final String checkCommand;
-  private final String checkUser;
+  private final String checkUser = "agent";
   
   @Override
   public String getProvides() {
     return provides;
   }
-  
-  @Override
-  public String getInstallUser() {
-    return installUser;
-  }
-  
-  @Override
-  public String getPackage() {
-    return pkg;
-  }
 
   @Override
   public String[] getActiveRoles() throws IOException {
@@ -221,7 +196,6 @@ public class XmlComponentDefinition exte
       um = jaxbContext.createUnmarshaller();
       Component component = (Component) um.unmarshal(in);
       provides = component.provides;
-      pkg = component.pkg;
       int i = 0;
       if (component.requires == null) {
         dependencies = new String[0];
@@ -240,16 +214,9 @@ public class XmlComponentDefinition exte
           roles[i++] = r.name;
         }      
       }
-      installCommand = getCommand(component.install);
-      installUser = getUser(component.install, component.user);
-      configureCommand = getCommand(component.configure);
-      configureUser = getUser(component.configure, component.user);
       startCommand = getCommand(component.start);
-      startUser = getUser(component.start, component.user);
       checkCommand = getCommand(component.check);
-      checkUser = getUser(component.check, component.user);
       prestartCommand = getCommand(component.prestart);
-      prestartUser = getUser(component.prestart, component.user);
       if (component.check != null) {
         checkRole = component.check.runOn;
       } else {
@@ -260,8 +227,6 @@ public class XmlComponentDefinition exte
       } else {
         prestartRole = null;
       }
-      uninstallCommand = getCommand(component.uninstall);
-      uninstallUser = getUser(component.uninstall, component.user);
     } catch (JAXBException e) {
       throw new IOException("Problem parsing component defintion", e);
     }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
Mon Nov 28 06:47:24 2011
@@ -777,6 +777,14 @@ public class Clusters {
     
     
     /*
+     * Get the deployment script for this clustername/revision combo
+     */
+    public String getInstallAndConfigureScript(String clusterName,
+        int revision) {
+      return ""; //TODO: fill
+    }
+    
+    /*
      * Get the latest cluster definition
      */
     public ClusterDefinition getLatestClusterDefinition(String clusterName) throws Exception
{

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
Mon Nov 28 06:47:24 2011
@@ -17,27 +17,20 @@
  */
 package org.apache.ambari.controller;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.ambari.controller.Clusters;
 import org.apache.ambari.controller.Nodes;
 import org.apache.ambari.common.rest.agent.Action;
-import org.apache.ambari.common.rest.agent.Action.Kind;
 import org.apache.ambari.common.rest.agent.ActionResult;
-import org.apache.ambari.common.rest.agent.AgentRoleState;
 import org.apache.ambari.common.rest.agent.Command;
 import org.apache.ambari.common.rest.agent.ControllerResponse;
 import org.apache.ambari.common.rest.agent.HeartBeat;
-import org.apache.ambari.common.rest.agent.Action.Signal;
 import org.apache.ambari.components.ComponentPlugin;
 import org.apache.ambari.resource.statemachine.ClusterFSM;
 import org.apache.ambari.resource.statemachine.RoleFSM;
@@ -60,6 +53,10 @@ public class HeartbeatHandler {
   
   public ControllerResponse processHeartBeat(HeartBeat heartbeat) 
       throws Exception {
+    String hostname = heartbeat.getHostname();
+    Date heartbeatTime = new Date(System.currentTimeMillis());
+    Nodes.getInstance().checkAndUpdateNode(hostname, heartbeatTime);
+    
     ControllerResponse response = 
         agentToHeartbeatResponseMap.get(heartbeat.getHostname());
     if (response != null) {
@@ -69,135 +66,73 @@ public class HeartbeatHandler {
     }
 
     short responseId = (short)(heartbeat.getResponseId() + 1);
-
-    String hostname = heartbeat.getHostname();
-    Date heartbeatTime = new Date(System.currentTimeMillis());
     String clusterName = null;
-    long clusterRev = 0L;
-
-    Nodes.getInstance().checkAndUpdateNode(hostname, heartbeatTime);
+    int clusterRev = 0;
 
     List<Action> allActions = new ArrayList<Action>();
-    
-    
+
     //if the command-execution takes longer than one heartbeat interval
     //the check for idleness will prevent the same node getting the same 
     //command more than once. In the future this could be improved
     //to reflect the command execution state more accurately.
     if (heartbeat.getIdle()) {
-      clusterName = Nodes.getInstance().getNode(hostname)
-          .getNodeState().getClusterName();
-      if (clusterName != null) {
-        clusterRev = Clusters.getInstance().
-            getClusterByName(clusterName).getLatestRevisionNumber(); 
-      }
-      
-      ComponentAndRoleStates componentStates = 
-          new ComponentAndRoleStates();
-      //create some datastructures by looking at agent state
-      inspectAgentState(heartbeat, componentStates);
-      
-      //get the clusters the node belongs to
-      Set<ClusterNameAndRev> clustersNodeBelongsTo = 
-          componentStates.getClustersNodeBelongsTo();
-      boolean newNode = false;
-      //add the clusters the node *should* belong to
-      if (clusterName != null) {
-        newNode = checkAndAddClusterIds(clustersNodeBelongsTo, clusterName, 
-            clusterRev);
-      }
+      List<ClusterNameAndRev> clustersNodeBelongsTo = 
+          getClustersNodeBelongsTo(hostname);
+
       for (ClusterNameAndRev clusterIdAndRev : clustersNodeBelongsTo) {
-        //check whether this node is out-of-sync w.r.t what's running &
-        //installed, or is it compatible
-        if (!isCompatible(clusterIdAndRev.getClusterName(), 
-            clusterIdAndRev.getRevision(), clusterName, clusterRev)) {
-          createStopAndUninstallActions(componentStates, allActions, 
-              clusterIdAndRev, true);
-          continue;
-        }
+
+        String deployScript = 
+            Clusters.getInstance().getInstallAndConfigureScript(clusterName, 
+                clusterRev);
+        
+        //send the deploy script
+        
+
         //get the cluster object corresponding to the clusterId
         Cluster cluster = Clusters.getInstance()
-            .getClusterByName(clusterIdAndRev.getClusterName());
+            .getClusterByName(clusterName);
         //get the state machine reference to the cluster
         ClusterFSM clusterFsm = StateMachineInvoker
-            .getStateMachineClusterInstance(clusterIdAndRev.getClusterName());
+            .getStateMachineClusterInstance(clusterName);
 
         //the state machine references to the services
         List<ServiceFSM> clusterServices = clusterFsm.getServices();
-        //go through all the services, and check which role should be started
-        //TODO: Given that we already look at what is running/installed in 
-        //inspectAgentState, maybe we can avoid the following for loop.
+        //go through all the components, and check which role should be started
         for (ServiceFSM service : clusterServices) {
           List<RoleFSM> roles = service.getRoles();
           for (RoleFSM role : roles) {
             boolean nodePlayingRole = 
                 nodePlayingRole(hostname, role.getRoleName());
             if (nodePlayingRole) {
-              boolean roleInstalled = false;
-              boolean roleServerRunning = false;
-              boolean agentRoleStateChanged = false;
-              if (!newNode) {
-                roleInstalled = componentStates.isInstalled(
-                    clusterIdAndRev,
-                    role.getAssociatedService().getServiceName(), 
-                    role.getRoleName());     
-                roleServerRunning = componentStates.isStarted(
-                    clusterIdAndRev,
-                    role.getAssociatedService().getServiceName(),
-                    role.getRoleName()) 
-                    || componentStates.isStartInProgress(clusterIdAndRev,
-                        role.getAssociatedService().getServiceName(), 
-                        role.getRoleName());
-                agentRoleStateChanged = componentStates.hasStateChanged(
-                    clusterIdAndRev, role.getAssociatedService().getServiceName(), 
-                    role.getRoleName());
-              }
               ComponentPlugin plugin = 
                   cluster.getComponentDefinition(service.getServiceName());
-              
+
               //check whether the agent should start any server
               if (role.shouldStart()) {
-                if (!roleInstalled) {
-                  createDirStructureAction(clusterIdAndRev, cluster, 
-                      service.getServiceName(), role.getRoleName(), plugin,
-                      allActions);
-                  continue;
-                }
-                if (role.getRoleName().contains("-client")) { //TODO: have a good place to
define this
-                  //Client roles are special cases. They don't have any active servers
-                  //but should be considered active when installed. Setting the 
-                  //boolean to true ensures that the FSM gets an event (albeit a fake one).
-                  roleServerRunning = true;
-                }
-                if (!roleServerRunning) {
-                  //TODO: keep track of retries (via checkActionResults)
-                  Action action = 
-                      plugin.startServer(cluster.getName(), role.getRoleName());
-                  fillDetailsAndAddAction(action, allActions, clusterName,
-                      clusterRev, service.getServiceName(), 
-                      role.getRoleName());
-                }
-                //raise an event to the state machine for a successful 
-                //role-start instance
-                if (roleServerRunning && agentRoleStateChanged) {
+                Action action = 
+                    plugin.startServer(cluster.getName(), role.getRoleName());
+                fillDetailsAndAddAction(action, allActions, clusterName,
+                    clusterRev, service.getServiceName(), 
+                    role.getRoleName());
+                //check if a start-role was sent previously and the start was
+                //successful
+                if (wasStartRoleSuccessful(clusterIdAndRev, 
+                    role.getRoleName(), response, heartbeat)) {
+                  //raise an event to the state machine for a successful 
+                  //role-start
                   StateMachineInvoker.getAMBARIEventHandler()
                   .handle(new RoleEvent(RoleEventType.START_SUCCESS, role));
                 }
-                componentStates.
-                  continueRunning(clusterIdAndRev, 
-                      role.getAssociatedService().getServiceName(), 
-                      role.getRoleName());
               }
               //check whether the agent should stop any server
+              //note that the 'stop' is implicit - if the heartbeat response
+              //doesn't contain the fact that role should be starting/running, 
+              //the agent stops it
               if (role.shouldStop()) {
-                if (role.getRoleName().contains("-client")) { //TODO: have a good place to
define this
-                  //Client roles are special cases. Setting the 
-                  //boolean to false ensures that the FSM gets an event (albeit a fake one)

-                  roleServerRunning = false;
-                }
                 //raise an event to the state machine for a successful 
                 //role-stop instance
-                if (!roleServerRunning && agentRoleStateChanged) {
+                if (wasStopRoleSuccessful(clusterIdAndRev, 
+                    role.getRoleName(), response, heartbeat)) {
                   StateMachineInvoker.getAMBARIEventHandler()
                   .handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
                 }
@@ -206,12 +141,8 @@ public class HeartbeatHandler {
           }
           //check/create the special component/service-level 
           //actions (like safemode check). Only once per component.
-          checkAndCreateActions(cluster, clusterFsm, clusterIdAndRev, newNode,
-                service, heartbeat, allActions, componentStates);
-        }
-        if (!newNode) {
-          createStopAndUninstallActions(componentStates, allActions, 
-              clusterIdAndRev, false);
+          checkAndCreateActions(cluster, clusterFsm, clusterIdAndRev,
+              service, heartbeat, allActions);
         }
       }
     }
@@ -222,235 +153,74 @@ public class HeartbeatHandler {
     return r;
   }
   
-  private boolean checkAndAddClusterIds(Set<ClusterNameAndRev> clustersNodeBelongsTo,

-      String clusterName, long clusterRev) {
-    ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev(clusterName,
-        clusterRev);
-    if (!clustersNodeBelongsTo.contains(clusterNameAndRev)) {
-      clustersNodeBelongsTo.add(clusterNameAndRev);
-      return true;
-    }
-    return false;
-  }
-  
-  private void createDirStructureAction(ClusterNameAndRev clusterIdAndRev, 
-      Cluster cluster, String component, String role, ComponentPlugin plugin, 
-      List<Action> allActions) throws IOException {
-    String clusterId = clusterIdAndRev.getClusterName();
-    long clusterRev = clusterIdAndRev.getRevision();
-    //action for creating dir structure
-    Action action = new Action();
-    action.setKind(Kind.CREATE_STRUCTURE_ACTION);
-    action.setUser(plugin.getInstallUser());
-    action.setId(getSpecialActionID(clusterIdAndRev, component, role, 
-        SpecialServiceIDs.CREATE_STRUCTURE_ACTION_ID));
-    fillDetailsAndAddAction(action, allActions, clusterId,
-        clusterRev, component, role);
-  }
-  
-  private void createStopAndUninstallActions(ComponentAndRoleStates componentAndRoleStates,

-      List<Action> allActions, ClusterNameAndRev clusterIdAndRev, boolean forceUninstall)
{
-    Map<String, 
-        Map<String,RoleStateTracker>>
-    entrySet = componentAndRoleStates.getAllRoles(clusterIdAndRev);
-    for (Map.Entry<String, 
-        Map<String,RoleStateTracker>> entry : 
-          entrySet.entrySet()) {
-      String componentName = entry.getKey();
-      Set<Map.Entry<String,RoleStateTracker>> roleSet = entry.getValue().entrySet();
-      for (Map.Entry<String,RoleStateTracker> entryVal : roleSet) {
-        String roleName = entryVal.getKey();
-        if (forceUninstall) {
-          addAction(getStopRoleAction(clusterIdAndRev.getClusterName(), 
-              clusterIdAndRev.getRevision(), 
-              componentName, roleName), allActions);
-          addAction(getUninstallRoleAction(clusterIdAndRev.getClusterName(), 
-              clusterIdAndRev.getRevision(), 
-              componentName, roleName), allActions);
-        } else {
-          RoleStateTracker stateTracker = entryVal.getValue();
-          if (stateTracker.continueRunning) continue;
-
-          addAction(getStopRoleAction(clusterIdAndRev.getClusterName(), 
-              clusterIdAndRev.getRevision(), 
-              componentName, roleName), allActions);
-
-          if (stateTracker.uninstall)
-            addAction(getUninstallRoleAction(clusterIdAndRev.getClusterName(), 
-                clusterIdAndRev.getRevision(), 
-                componentName, roleName), allActions);
-        }
-      }
-    }
-  }
-  private static class ComponentAndRoleStates {
-    //Convenience class to aid in heartbeat processing
-    private Map<ClusterNameAndRev, Map<String, Map<String, RoleStateTracker>>>
-    componentRoleMap = new HashMap<ClusterNameAndRev, 
-                           Map<String, Map<String, RoleStateTracker>>>();
-    
-    private Map<String, ActionResult> actionIds = 
-        new HashMap<String, ActionResult>();
-    
-    private static Map<String, List<AgentRoleState>> previousStateMap =
-        new ConcurrentHashMap<String, List<AgentRoleState>>();
-    
-    private Set<ClusterNameAndRev> clusterNodeBelongsTo = 
-        new TreeSet<ClusterNameAndRev>();
-    
-    Map<String,Map<String,RoleStateTracker>> getAllRoles(
-        ClusterNameAndRev clusterIdAndRev) {
-      return componentRoleMap.get(clusterIdAndRev);
-    }
-    
-    void recordRoleState(String host, AgentRoleState state) {
-      ClusterNameAndRev clusterIdAndRev = 
-          new ClusterNameAndRev(state.getClusterId(),
-              state.getClusterDefinitionRevision());
-      clusterNodeBelongsTo.add(clusterIdAndRev);
-      
-      recordState(clusterIdAndRev,state.getComponentName(),
-          state.getRoleName(),state);
-      
-      List<AgentRoleState> agentRoleStates = null;
-      boolean alreadyPresent = false;
-
-      if ((agentRoleStates = previousStateMap.get(host)) != null) {
-        for (AgentRoleState agentRoleState : agentRoleStates) {
-          if (agentRoleState.roleAttributesEqual(state)) {
-            alreadyPresent = true; 
-            if (agentRoleState.getServerStatus() != state.getServerStatus()) { 
-              //state of the server is different. Record that.
-              setStateChanged(clusterIdAndRev,state.getComponentName(),
-                  state.getRoleName());
-              agentRoleState.setServerStatus(state.getServerStatus());
-            }
-          }
-        }
-      } else {
-        agentRoleStates = new ArrayList<AgentRoleState>();
-        previousStateMap.put(host, agentRoleStates);
-      }
-      if (!alreadyPresent) {
-        agentRoleStates.add(state); 
-      }
-    }
-    
-    boolean isRoleInstalled(ClusterNameAndRev clusterIdAndRev, String role) {
-      //problematic in the case where role is not unique (like 'client')
-      //TODO: no iteration please
-      Set<Map.Entry<String, Map<String, RoleStateTracker>>> entrySet =

-          componentRoleMap.get(clusterIdAndRev).entrySet();
-      for (Map.Entry<String, Map<String, RoleStateTracker>> entry : entrySet)
{
-        if (entry.getValue().containsKey(role)) {
-          return true;
-        }
-      }
+  private boolean wasStartRoleSuccessful(ClusterNameAndRev clusterIdAndRev, 
+      String roleName, ControllerResponse response, HeartBeat heartbeat) {
+    //Check whether the statechange was successful on the agent, and if
+    //the set of commands to the agent included the start-action for the
+    //role in question
+    if (!heartbeat.getStateChangeStatus()) {
       return false;
     }
-    
-    boolean isStarted(ClusterNameAndRev clusterIdAndRev, String component, 
-        String role) {
-      Map<String,Map<String,RoleStateTracker>> componentsMap = 
-          componentRoleMap.get(clusterIdAndRev);
-      if (componentsMap == null) {
-        return false;
-      }
-      Map<String, RoleStateTracker> startedServerMap;
-      if ((startedServerMap = componentsMap.get(component)) != null) {
-        RoleStateTracker state = startedServerMap.get(role);
-        if (state == null) 
-          return false;
-        return state.state == AgentRoleState.State.STARTED;
+    List<Action> actions = response.getActions();
+    for (Action action : actions) { //TBD: no iteration for every role
+      if (action.kind != Action.Kind.START_ACTION) {
+        continue;
+      }
+      if (action.getClusterId().equals(clusterIdAndRev.getClusterName()) && 
+          action.getClusterDefinitionRevision() == 
+          clusterIdAndRev.getRevision() &&
+          action.getRole().equals(roleName)) {
+        return true;
       }
-      return false;
     }
-    
-    boolean isStartInProgress(ClusterNameAndRev clusterIdAndRev, 
-        String component, String role) {
-      Map<String,Map<String,RoleStateTracker>> componentsMap = 
-          componentRoleMap.get(clusterIdAndRev);
-      if (componentsMap == null) {
-        return false;
-      }
-      Map<String, RoleStateTracker> startedServerMap;
-      if ((startedServerMap = componentsMap.get(component)) != null) {
-        RoleStateTracker state = startedServerMap.get(role);
-        if (state != null) {
-          return state.state == AgentRoleState.State.STARTED ||
-              state.state == AgentRoleState.State.STARTING;
-        } else return false;
-      }
+    return false;
+  }
+  
+  private boolean wasStopRoleSuccessful(ClusterNameAndRev clusterIdAndRev, 
+      String roleName, ControllerResponse response, HeartBeat heartbeat) {
+    //Check whether the statechange was successful on the agent, and if
+    //the set of commands to the agent included the start-action for the
+    //role in question. If the set of commands didn't include the start-action
+    //command, the controller wants the role stopped
+    if (!heartbeat.getStateChangeStatus()) {
       return false;
     }
-    
-    boolean isInstalled(ClusterNameAndRev clusterIdAndRev, 
-        String component, String role) {
-      Map<String,Map<String,RoleStateTracker>> componentsMap = 
-          componentRoleMap.get(clusterIdAndRev);
-      if (componentsMap == null) {
+    List<Action> actions = response.getActions();
+    for (Action action : actions) {
+      if (action.getClusterId() == clusterIdAndRev.getClusterName() && 
+          action.getClusterDefinitionRevision() == 
+          clusterIdAndRev.getRevision() &&
+          action.getRole().equals(roleName) &&
+          action.kind == Action.Kind.START_ACTION) {
         return false;
       }
-      Map<String, RoleStateTracker> startedRoleMap;
-      if ((startedRoleMap = componentsMap.get(component)) != null) {
-        RoleStateTracker state = startedRoleMap.get(role);
-        return state != null;
-      }
-      return false;
-    }
-    
-    void recordActionId(String actionId, ActionResult actionResult) {
-      actionIds.put(actionId, actionResult);
     }
-    ActionResult getActionResult(String id) {
-      return actionIds.get(id);
-    }
-    private void recordState(ClusterNameAndRev clusterIdAndRev, String component,
-        String roleServer, AgentRoleState state) {
-      Map<String, Map<String, RoleStateTracker>> componentMap = null;
-      
-      if ((componentMap = componentRoleMap.get(clusterIdAndRev))
-          != null) {
-        Map<String,RoleStateTracker> roles = componentMap.get(component);
-        RoleStateTracker roleState = new RoleStateTracker(state.getServerStatus(), 
-            state.getClusterId(), state.getClusterDefinitionRevision());
-        if (roles != null) {
-          roles.put(roleServer, roleState);
-        } else {
-          roles = new HashMap<String, RoleStateTracker>();
-          componentMap.put(component, roles);
-        }
-        return;
+    return true;
+  }
+  
+  private ActionResult getActionResult(HeartBeat heartbeat, String id) {
+    List<ActionResult> actionResults = heartbeat.getActionResults();
+    for (ActionResult result : actionResults) {
+      if (result.getId().equals(id)) {
+        return result;
       }
-      componentMap = new HashMap<String, Map<String,RoleStateTracker>>();
-      componentRoleMap.put(clusterIdAndRev, componentMap);
-      Map<String,RoleStateTracker> roleMap = new HashMap<String,RoleStateTracker>();
-      
-      roleMap.put(roleServer, 
-          new RoleStateTracker(state.getServerStatus(), 
-              state.getClusterId(), state.getClusterDefinitionRevision()));
-      componentMap.put(component, roleMap);
-    }
-    boolean hasStateChanged(ClusterNameAndRev clusterIdAndRev, String component,
-        String roleServer) {
-      return componentRoleMap.get(clusterIdAndRev).get(component)
-          .get(roleServer).stateChanged;
-    }
-    Set<ClusterNameAndRev> getClustersNodeBelongsTo() {
-      return clusterNodeBelongsTo;
-    }
-    private void setStateChanged(ClusterNameAndRev clusterIdAndRev, 
-        String component, String roleServer) {
-      componentRoleMap.get(clusterIdAndRev).get(component)
-         .get(roleServer).stateChanged = true;
-    }
-    private void continueRunning(ClusterNameAndRev clusterIdAndRev, 
-        String component, String roleServer) {
-      componentRoleMap.get(clusterIdAndRev).get(component)
-         .get(roleServer).continueRunning = true;
     }
+    return null;
   }
   
+  private List<ClusterNameAndRev> getClustersNodeBelongsTo(String hostname) 
+      throws Exception {
+    String clusterName = Nodes.getInstance().getNode(hostname)
+        .getNodeState().getClusterName();
+    if (clusterName != null) {
+      int clusterRev = Clusters.getInstance().
+          getClusterByName(clusterName).getLatestRevisionNumber();
+      List<ClusterNameAndRev> l = new ArrayList<ClusterNameAndRev>();
+      l.add(new ClusterNameAndRev(clusterName, clusterRev));
+      return l;
+    }
+    return new ArrayList<ClusterNameAndRev>(); //empty
+  }  
   
   private enum SpecialServiceIDs {
       SERVICE_AVAILABILITY_CHECK_ID, SERVICE_PRESTART_CHECK_ID,
@@ -498,38 +268,10 @@ public class HeartbeatHandler {
     }
   }
 
-  private static class RoleStateTracker {
-    AgentRoleState.State state; //the current state of the server
-    boolean stateChanged; //whether the state of the server changed 
-                          //since the last heartbeat
-    boolean continueRunning; //whether the server should continue
-                             //running
-    boolean uninstall;
-    
-    RoleStateTracker(AgentRoleState.State state, 
-        String clusterId, long clusterRev) {
-      this.state = state;
-      this.stateChanged = false;
-      this.continueRunning = false;
-      this.uninstall = false;
-    }
-  }
-
-  private boolean isCompatible(String nodeClusterId, long nodeClusterRev, 
-      String controllerClusterId, long controllerClusterRev) {
-    //TODO: make this "smart"
-    if (!nodeClusterId.equals(controllerClusterId)) {
-      return false;
-    }
-    if (nodeClusterRev != controllerClusterRev) {
-      return false;
-    }
-    return true;
-  }
-  private static String getSpecialActionID(ClusterNameAndRev clusterIdAndRev, 
+  private static String getSpecialActionID(ClusterNameAndRev clusterNameAndRev, 
       String component, String role, SpecialServiceIDs serviceId) {
-    String id = clusterIdAndRev.getClusterName() +"-"+ 
-      clusterIdAndRev.getRevision() +"-"+ component + "-";
+    String id = clusterNameAndRev.getClusterName() +"-"+
+      clusterNameAndRev.getRevision() +"-"+ component + "-";
     if (role != null) {
       id += role + "-";
     }
@@ -537,46 +279,10 @@ public class HeartbeatHandler {
     return id;
   }
   
-  private void inspectAgentState(HeartBeat heartbeat, 
-      ComponentAndRoleStates componentServers)
-          throws IOException {
-      try {
-        List<AgentRoleState> agentRoleStates = 
-            heartbeat.getInstalledRoleStates();
-        if (agentRoleStates == null) {
-          return;
-        }
-        List<Cluster> clustersNodeBelongsTo = new ArrayList<Cluster>();
-        for (AgentRoleState agentRoleState : agentRoleStates) {
-          componentServers.recordRoleState(heartbeat.getHostname(),agentRoleState);
-          Cluster c = Clusters.getInstance().
-              getClusterByName(agentRoleState.getClusterId());
-          clustersNodeBelongsTo.add(c);
-        }
-        checkActionResults(heartbeat, componentServers);
-      } catch (Exception e) {
-          throw new IOException (e);
-      }
-  }
-  
-  private void checkActionResults(HeartBeat heartbeat,
-      ComponentAndRoleStates installOrStartedComponents) {
-    
-    List<ActionResult> actionResults = heartbeat.getActionResults();
-    if (actionResults == null) {
-      return;
-    }
-    for (ActionResult actionResult : actionResults) {
-      installOrStartedComponents.recordActionId(actionResult.getId(),
-          actionResult);
-    }   
-  }
-  
   private void checkAndCreateActions(Cluster cluster,
       ClusterFSM clusterFsm, ClusterNameAndRev clusterIdAndRev, 
-      boolean newNode, ServiceFSM service, HeartBeat heartbeat, 
-      List<Action> allActions, 
-      ComponentAndRoleStates installedOrStartedComponents) 
+      ServiceFSM service, HeartBeat heartbeat, 
+      List<Action> allActions) 
           throws Exception {
     //see whether the service is in the STARTED state, and if so,
     //check whether there is any action-result that indicates success
@@ -584,7 +290,7 @@ public class HeartbeatHandler {
     if (service.getServiceState() == ServiceState.STARTED) {
       String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
           null, SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
-      ActionResult result = installedOrStartedComponents.getActionResult(id);
+      ActionResult result = getActionResult(heartbeat, id);
       if (result != null) {
         //this action ran
         //TODO: this needs to be generalized so that it handles the case
@@ -603,8 +309,7 @@ public class HeartbeatHandler {
         ComponentPlugin plugin = 
             cluster.getComponentDefinition(service.getServiceName());
         String role = plugin.runCheckRole();
-        if (installedOrStartedComponents.isRoleInstalled(clusterIdAndRev,
-            role)) {
+        if (nodePlayingRole(heartbeat.getHostname(), role)) {
           Action action = plugin.checkService(cluster.getName(), role);
           fillActionDetails(action, clusterIdAndRev.getClusterName(),
               clusterIdAndRev.getRevision(),service.getServiceName(), role);
@@ -618,7 +323,7 @@ public class HeartbeatHandler {
     if (service.getServiceState() == ServiceState.PRESTART) {
       String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
           null, SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
-      ActionResult result = installedOrStartedComponents.getActionResult(id);
+      ActionResult result = getActionResult(heartbeat, id);
       if (result != null) {
         //this action ran
         if (result.getCommandResult().getExitCode() == 0) {
@@ -635,20 +340,13 @@ public class HeartbeatHandler {
             cluster.getComponentDefinition(service.getServiceName());
         String role = plugin.runPreStartRole();
         if (nodePlayingRole(heartbeat.getHostname(), role)) {
-          if (newNode || 
-              !installedOrStartedComponents.isRoleInstalled(clusterIdAndRev,
-              role)) {
-            createDirStructureAction(clusterIdAndRev, cluster, 
-                service.getServiceName(), role, plugin,
-                allActions);
-          }
+          Action action = plugin.preStartAction(cluster.getName(), role);
+          fillActionDetails(action, clusterIdAndRev.getClusterName(),
+              clusterIdAndRev.getRevision(),service.getServiceName(), role);
+          action.setId(id);
+          action.setKind(Action.Kind.RUN_ACTION);
+          addAction(action, allActions);
         }
-        Action action = plugin.preStartAction(cluster.getName(), role);
-        fillActionDetails(action, clusterIdAndRev.getClusterName(),
-            clusterIdAndRev.getRevision(),service.getServiceName(), role);
-        action.setId(id);
-        action.setKind(Action.Kind.RUN_ACTION);
-        addAction(action, allActions);
       }
     }
   }
@@ -667,23 +365,6 @@ public class HeartbeatHandler {
     }
   }
   
-  private Action getStopRoleAction(String clusterId, long clusterRev, 
-      String componentName, String roleName) {
-    Action action = new Action();
-    fillActionDetails(action, clusterId, clusterRev, componentName, roleName);
-    action.setKind(Kind.STOP_ACTION);
-    action.setSignal(Signal.KILL);
-    return action;
-  }
-  
-  private Action getUninstallRoleAction(String clusterId, long clusterRev, 
-      String componentName, String roleName) {
-    Action action = new Action();
-    fillActionDetails(action, clusterId, clusterRev, componentName, roleName);
-    action.setKind(Kind.DELETE_STRUCTURE_ACTION);
-    return action;
-  }
-    
   private void fillActionDetails(Action action, String clusterId, 
       long clusterDefRev, String component, String role) {
     if (action == null) {

Modified: incubator/ambari/trunk/controller/src/main/resources/org/apache/ambari/acd/hadoop-hdfs-0.1.0.acd
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/resources/org/apache/ambari/acd/hadoop-hdfs-0.1.0.acd?rev=1206994&r1=1206993&r2=1206994&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/resources/org/apache/ambari/acd/hadoop-hdfs-0.1.0.acd
(original)
+++ incubator/ambari/trunk/controller/src/main/resources/org/apache/ambari/acd/hadoop-hdfs-0.1.0.acd
Mon Nov 28 06:47:24 2011
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<component provides="hdfs" user="root">
+<component provides="hdfs">
   <requires name="common"/>
   <roles name="namenode"/>
   <roles name="secondarynamenode"/>



Mime
View raw message